Problems understanding the Axon Spring Kafka extension design and usage

Hello,

We are using the Axon Framework for some of the services in our ecosystem, coupled with Axon Server. There are other services in our ecosystem which communicate via Apache Kafka - there are ongoing efforts to migrate and integrate these components.

We now have a use case where a new feature has come along where we expect an enormous event throughput (at least 3x - 4x the rest of our system, combined), although these are relatively “dumb” CRUD events that will be used for reporting purposes (i.e. data aggregation) and not meaningful domain events.

As a result, we wrote a new service and we wanted to make use of the Kafka ecosystem to process this data but still wished to use Axon to be able to take advantage of e.g. subscription queries for the aggregated data, which the same service could ostensibly make sense of (as it is still its’ own domain more so than any other service’s).

To do this, we used the axon-kafka-spring-boot-starter to use Kafka as a plain Event Bus, with the following config:

axon:
  serializer:
    general: jackson
    # Configured via bean override.
    #    events: jackson
    messages: jackson
  kafka:
    bootstrap-servers: localhost:29092
    client-id: kafka-axon-example
    default-topic: mytopic
    properties:
      security.protocol: PLAINTEXT

    producer:
      retries: 3
      event-processor-mode: subscribing
      # For additional unnamed properties, add them to the `properties` map like so
      properties:
        compression.type: zstd
        key.serializer: org.apache.kafka.common.serialization.StringSerializer
        value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        schema.registry.url: http://localhost:8081
        auto.register.schemas: false
        basic.auth:
          credentials.source: USER_INFO
          user.info: ${SR_USER}:${SR_PASS}
    fetcher:
      poll-timeout: 3000

And including the following custom configuration:

@Configuration
@EnableConfigurationProperties({KafkaProperties.class, TokenStoreProperties.class})
public class SerializationConfig {

    private final KafkaProperties properties;

    public SerializationConfig(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @Qualifier("eventSerializer")
    public Serializer eventSerializer(RevisionResolver revisionResolver) {
        var kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(properties.buildProducerProperties(), false);
        return new AxonKafkaAvroSerializer("mytopic", kafkaAvroSerializer);
    }

    @Bean
    public KafkaMessageConverter<?, ?> kafkaMessageConverter(@Qualifier("eventSerializer") Serializer eventSerializer,
            org.axonframework.config.Configuration configuration) {
            return DefaultKafkaMessageConverter
                    .builder()
                    .serializer(eventSerializer)
                    .sequencingPolicy(PropertySequencingPolicy.builder(Receipt.class, UUID.class)
                            .propertyExtractor(Receipt::getSessionId)
                            .propertyName("sessionId")
                            .build())
                    .upcasterChain(configuration.upcasterChain()
                            != null ? configuration.upcasterChain() : new EventUpcasterChain())
                    .build();
    }

}

We are declaring our own EventSerializer bean to be able to use the Confluent KafkaAvroSerializer which has support for a Schema Registry which we can use to store Avro-based schemas.

Additionally, we are overriding the KafkaMessageConverter bean to provide a key for the records based on a record property.

Afterwards, we simply pass the message along from a controller after some validation:

@RestController
@RequestMapping("/v1")
public class ReceiptController {

    @Autowired
    private EventGateway eventGateway;

    @Autowired
    private ReceiptMapper mapper; // mapStruct based bean mapper

    @PostMapping("/receipts")
    public void createReceipt(@RequestBody @Valid Receipt receipt) {
        var mappedReceipt = mapper.receiptToAvro(receipt);
        eventGateway.publish(mappedReceipt);
    }

}

This approach works in general: incoming messages are validated, transformed into an Avro based format, and then written to Kafka.

However, I dug into the implementation to make sure I understand what Axon is doing under the hood and came across the org.axonframework.eventhandling.AbstractEventBus#publish method. Relevant lines:

143            try {
144                prepareCommit(intercept(eventsWithContext));
145                commit(eventsWithContext);
146                afterCommit(eventsWithContext);
147                ingested.forEach(MessageMonitor.MonitorCallback::reportSuccess);
148            } catch (Exception e) {
149                ingested.forEach(m -> m.reportFailure(e));
150                throw e;
151            }

which in turn invokes org.axonframework.eventsourcing.eventstore.AbstractEventStore#prepareCommit:

64    @Override
65    protected void prepareCommit(List<? extends EventMessage<?>> events) {
66        storageEngine.appendEvents(events);
67        super.prepareCommit(events);
68    }

since I have axon.axonserver.enabled: true set by default, the Event Storage Engine implementation is the AxonIQEventStorageEngine which causes the message to be sent to the Axon Server by default in org.axonframework.axonserver.connector.event.axon.AxonServerEventStore.AxonIQEventStorageEngine#appendEvents :

Notably in line 458, this message is actually not only serialized before I expected it to be (during the send() method of the KafkaProducer), but sent to the Axon Server too.

If I set axon.axonserver.enabled: false then by default the org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine is used, which doesn’t quite work for my use case out of the box and causes some other cascading failures (haven’t looked at them in detail yet).

In our use case, it’s not strictly necessary to send the event to the Axon Server - in fact it’s detrimental since this would mean a huge throughput spike to it that’s relatively pointless (as we publish via Kafka).

I would have thought that the presence of the axon-kafka-spring-boot-starter module would set up a kind of ‘no-op’ EventStorageEngine and simply forwarded all events to the KafkaProducer that is set up, but I couldn’t find such an implementation.

We would still like to be able to generally use the Axon Server for basic monitoring and service discovery with other Axon Services, but I’m beginning to think that our use case precludes using Axon at all in this instance.

Can anyone shed a bit of light as to how the Kafka extension is meant to be used? Is it meant to be used in addition to storing the events in e.g. Axon Server, as opposed to not storing them in an event store and just pushing them to an event bus?

Help would be greatly appreciated.

Hi Filpano,

I do think the extension was originally meant to be used without Axon Server. That doesn’t mean you can’t combine them through. However, that might sometimes create additional complexity.

From what you described it seems like the Kafka messages are not related to any Axon messages. If that is the case, you might use just spring-kafka, and maybe even put them in a seperate service. It depends, if its using Axon Server just for subscription queries, it might be easiest to just configure the components needed for just that.

If they are related, than an example might help. This project add Kafka integration both ways by adding three things. The kafka-legacy producer is just a simulation of some external app sending messages to Kafka. The kafka-legacy-consumer will consume those messages and turn them into commands wich go to axon server. The kafka-event-emitter will publish some of the events from Axon Server to Kafka.

I hope the example helps. I think the main problem resides in Kafka not being capable of being an EventStorageEngine so you still get the one from Axon Server.