Hello,
We are using the Axon Framework for some of the services in our ecosystem, coupled with Axon Server. There are other services in our ecosystem which communicate via Apache Kafka - there are ongoing efforts to migrate and integrate these components.
We now have a use case where a new feature has come along where we expect an enormous event throughput (at least 3x - 4x the rest of our system, combined), although these are relatively “dumb” CRUD events that will be used for reporting purposes (i.e. data aggregation) and not meaningful domain events.
As a result, we wrote a new service and we wanted to make use of the Kafka ecosystem to process this data but still wished to use Axon to be able to take advantage of e.g. subscription queries for the aggregated data, which the same service could ostensibly make sense of (as it is still its’ own domain more so than any other service’s).
To do this, we used the axon-kafka-spring-boot-starter
to use Kafka as a plain Event Bus, with the following config:
axon:
serializer:
general: jackson
# Configured via bean override.
# events: jackson
messages: jackson
kafka:
bootstrap-servers: localhost:29092
client-id: kafka-axon-example
default-topic: mytopic
properties:
security.protocol: PLAINTEXT
producer:
retries: 3
event-processor-mode: subscribing
# For additional unnamed properties, add them to the `properties` map like so
properties:
compression.type: zstd
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
schema.registry.url: http://localhost:8081
auto.register.schemas: false
basic.auth:
credentials.source: USER_INFO
user.info: ${SR_USER}:${SR_PASS}
fetcher:
poll-timeout: 3000
And including the following custom configuration:
@Configuration
@EnableConfigurationProperties({KafkaProperties.class, TokenStoreProperties.class})
public class SerializationConfig {
private final KafkaProperties properties;
public SerializationConfig(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@Qualifier("eventSerializer")
public Serializer eventSerializer(RevisionResolver revisionResolver) {
var kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(properties.buildProducerProperties(), false);
return new AxonKafkaAvroSerializer("mytopic", kafkaAvroSerializer);
}
@Bean
public KafkaMessageConverter<?, ?> kafkaMessageConverter(@Qualifier("eventSerializer") Serializer eventSerializer,
org.axonframework.config.Configuration configuration) {
return DefaultKafkaMessageConverter
.builder()
.serializer(eventSerializer)
.sequencingPolicy(PropertySequencingPolicy.builder(Receipt.class, UUID.class)
.propertyExtractor(Receipt::getSessionId)
.propertyName("sessionId")
.build())
.upcasterChain(configuration.upcasterChain()
!= null ? configuration.upcasterChain() : new EventUpcasterChain())
.build();
}
}
We are declaring our own EventSerializer
bean to be able to use the Confluent KafkaAvroSerializer
which has support for a Schema Registry which we can use to store Avro-based schemas.
Additionally, we are overriding the KafkaMessageConverter
bean to provide a key for the records based on a record property.
Afterwards, we simply pass the message along from a controller after some validation:
@RestController
@RequestMapping("/v1")
public class ReceiptController {
@Autowired
private EventGateway eventGateway;
@Autowired
private ReceiptMapper mapper; // mapStruct based bean mapper
@PostMapping("/receipts")
public void createReceipt(@RequestBody @Valid Receipt receipt) {
var mappedReceipt = mapper.receiptToAvro(receipt);
eventGateway.publish(mappedReceipt);
}
}
This approach works in general: incoming messages are validated, transformed into an Avro based format, and then written to Kafka.
However, I dug into the implementation to make sure I understand what Axon is doing under the hood and came across the org.axonframework.eventhandling.AbstractEventBus#publish
method. Relevant lines:
143 try {
144 prepareCommit(intercept(eventsWithContext));
145 commit(eventsWithContext);
146 afterCommit(eventsWithContext);
147 ingested.forEach(MessageMonitor.MonitorCallback::reportSuccess);
148 } catch (Exception e) {
149 ingested.forEach(m -> m.reportFailure(e));
150 throw e;
151 }
which in turn invokes org.axonframework.eventsourcing.eventstore.AbstractEventStore#prepareCommit
:
64 @Override
65 protected void prepareCommit(List<? extends EventMessage<?>> events) {
66 storageEngine.appendEvents(events);
67 super.prepareCommit(events);
68 }
since I have axon.axonserver.enabled: true
set by default, the Event Storage Engine implementation is the AxonIQEventStorageEngine
which causes the message to be sent to the Axon Server by default in org.axonframework.axonserver.connector.event.axon.AxonServerEventStore.AxonIQEventStorageEngine#appendEvents
:
Notably in line 458
, this message is actually not only serialized before I expected it to be (during the send()
method of the KafkaProducer
), but sent to the Axon Server too.
If I set axon.axonserver.enabled: false
then by default the org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine
is used, which doesn’t quite work for my use case out of the box and causes some other cascading failures (haven’t looked at them in detail yet).
In our use case, it’s not strictly necessary to send the event to the Axon Server - in fact it’s detrimental since this would mean a huge throughput spike to it that’s relatively pointless (as we publish via Kafka).
I would have thought that the presence of the axon-kafka-spring-boot-starter
module would set up a kind of ‘no-op’ EventStorageEngine
and simply forwarded all events to the KafkaProducer
that is set up, but I couldn’t find such an implementation.
We would still like to be able to generally use the Axon Server for basic monitoring and service discovery with other Axon Services, but I’m beginning to think that our use case precludes using Axon at all in this instance.
Can anyone shed a bit of light as to how the Kafka extension is meant to be used? Is it meant to be used in addition to storing the events in e.g. Axon Server, as opposed to not storing them in an event store and just pushing them to an event bus?
Help would be greatly appreciated.