Using Axon and axon-kafka to read non-axon events

Hello,

some time ago I asked about using the Axon Kafka extension: Problems understanding the Axon Spring Kafka extension design and usage

I wasn’t able to get it working quite right for the use case that we had, but ended up using Spring Kafka to publish events to a topic as as separate write service.

What we would like to do now is use Axon and the Axon Kafka extension on the query side (within another separate read service) without Axon Server, since the Axon Framework’s query side offers abstractions for a lot of the query handling.

Before I share more configs and potentially waste time trying to debug something even further, I wanted to ask: is it possible to use the QueryGateway/QueryBus to consume plain (i.e. non-axon) Kafka records?

I managed to set up a small project that does this, but from what I could see the Fetcher simply discarded all of my records before even getting to the dispatch part of the processing lifecycle. The MessageConverterMode enum of org.axonframework.extensions.kafka.KafkaProperties:

    /**
     * Modes for kafka message conversion.
     */
    public enum MessageConverterMode {
        /**
         * Default way using an 'axon' format, with data as binary value and headers for additional information, where
         * most of the headers start with 'axon-'.
         */
        DEFAULT,
        /**
         * Using <a href="https://cloudevents.io/">Cloud Events</a>, depending on the configuration of the serializers
         * is either stored as one JSON with everything, or only the data as value with all other information as
         * headers, with all headers starting with 'ce_'.
         */
        CLOUD_EVENT
    }

, if I have understood it’s intent correctly, seems to imply that the extension is not capable of reading fully generic Kafka records but only those that have the axon-* and ce_* header prefixes, which seems like it would explain why my records are simply discarded.

We produce these records with spring-kafka and they are as such not associated with any Axon messages. Similarly, they are not quite capable of being cloud events as per spec as they are being stored in an Avro format for supporting Schema evolution.

Is Axon unsuitable for this use case as described?

It can work for events, but you must create your own KafkaMessageConverter implementation. There is nothing like generic Kafka records since that is just a bunch of bytes. We need some assumptions on the bytes to turn them into Axon Events.

Theoretically, you could have a query bus using Kafka, which is quite hard. The only distributed query bus the framework is currently offering is based on Axon Server.

Thanks for the swift reply - it’s highly appreciated.

It can work for events, but you must create your own KafkaMessageConverter implementation. There is nothing like generic Kafka records since that is just a bunch of bytes. We need some assumptions on the bytes to turn them into Axon Events.

I may have been a little too handwavy with my “generic record” term. :slight_smile:
You are of course correct that it’s a matter of interpretation on the read side. Since the KafkaMessageConverter contract is relatively lenient, this seems like it should be easily doable (e.g. wrap a Confluent KafkaAvroSe-/Deserializer within a message converter). Thanks for the suggestion.

Theoretically, you could have a query bus using Kafka, which is quite hard. The only distributed query bus the framework is currently offering is based on Axon Server.

I believe that I misspoke on this end. We would not need any of the actual routing capabilities of the query bus - all Query destination resolution would happen locally in the read service itself (albeit in one instance of N) like in a “plain” web service.

The planned implementation path looks as follows:

  1. Produce Kafka record.
  2. Using axon-kafka and a SubscribingEventProcessor (to be able to use Kafka mechanisms for tracking), read and wrap this record as an Event Message, interpretable for Axon.
  3. Forward this Event message to an @EventHandler in the read service for building a projection. This event wouldn’t have any Axon-specific characteristics, but as far as I can tell after seeing the source code for various Axon examples, the event messages can also be “plain” POJOs which I would practically have in this case.
  4. Query these projections using a @QueryHandler, potentially using a subscription query for updates.

Notably, the read and wrap this record as an Event Message was not part of my initial post which I overlooked. However, we wouldn’t require a distributed query bus.

Do you know if the above approach would work for ingesting this data? Am I missing anything to make this work in an Axon context?

Yes, that should all work. Given a converter for the format (which could just have a payload and payload class, but not the aggregate identifier and such), and a local query bus.

If you run multiple instances you would want each instance to reach each event, for the subscriptions. You also want the updates to be idempotent, since all instances would want to update. Also you need to make sure the Kafka configuration is correct for this. For example, you don’t want to share a consumer group, as each instance would only get part of the events that way.

For example, you don’t want to share a consumer group, as each instance would only get part of the events that way.

Thanks for the hint. We expect a very high event throughput (as an idempotent upsert), so we’ll likely be storing the projections persistently in the long run. From what I understand, this should take care of the synchronization problem across instances. But yes, in-memory projections would be a problem with this approach.

And subscription queries as well, since they would only ‘see’ the responses from the events that hit the instance they are connected to.

And subscription queries as well, since they would only ‘see’ the responses from the events that hit the instance they are connected to.

Ah, we hadn’t considered that - thanks for highlighting that important aspect. Then we’ll likely need to rework our current approach a bit .

[…] and a local query bus.

I am currently testing this approach and have met some unexpected behaviour that I can’t quite explain.

If I do not autowire a QueryBus or QueryGateway bean, I am able to process records and have them be forwarded to my event handler (though it currently accepts the “wildcard” Message<?> as parameter for the technical proof).

If I autowire a QueryBus or QueryGateway however, it seems that a TrackingEventProcessor is set up somewhere that prevents me from consuming any records at all.

The configuration I have looks as follows:

AxonAvroKafkaConverter supporting only deserialization (we do not need to produce on the consumer side)

public class AvroKafkaMessageConverter implements KafkaMessageConverter<UUID, byte[]> {

    private final KafkaAvroDeserializer deserializer;
    private final String topic;

    public AvroKafkaMessageConverter(String topic, KafkaAvroDeserializer deserializer) {
        this.topic = Objects.requireNonNull(topic);
        this.deserializer = Objects.requireNonNull(deserializer);
    }

    @Override
    public ProducerRecord<UUID, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<UUID, byte[]> consumerRecord) {
        return Optional.of(
                GenericEventMessage.asEventMessage(
                        deserializer.deserialize(topic, consumerRecord.headers(), consumerRecord.value())));
    }
}

SerdeConfig that exposes the above Converter as a bean

@Configuration
public class SerdeConfig {

    @Bean
    public KafkaMessageConverter<UUID, byte[]> kafkaMessageConverter() { //KafkaProperties axonKafkaProperties) {
        KafkaAvroDeserializer deserializerDelegate = new KafkaAvroDeserializer();
        Map<String, Object> properties = new HashMap<>();

        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        properties.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
        properties.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "user:user");
        properties.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

        deserializerDelegate.configure(properties, false);

        return new AvroKafkaMessageConverter("MY_TOPIC", deserializerDelegate);
    }
}

AxonKafkaConfig which sets up the relevant axon-kafka beans

@Configuration
public class AxonKafkaConfig {

    @Autowired
    private EventProcessingConfigurer eventProcessingConfigurer;

    private String groupId = "my-api";

    private String processorName = "myProcessor";

    @Bean
    public KafkaAvroDeserializer kafkaAvroDeserializer() {
        return new KafkaAvroDeserializer();
    }
    @Bean
    public ConsumerFactory<UUID, byte[]> consumerFactory(KafkaProperties consumerConfiguration) {
        return new DefaultConsumerFactory<>(consumerConfiguration.buildConsumerProperties());
    }

    @Bean
    public Fetcher<?, ?, ?> fetcher() {
        return AsyncFetcher.builder()
                .build();
    }

    @Bean
    public SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource(
            ConsumerFactory<UUID, byte[]> consumerFactory,
            Fetcher<UUID, byte[], EventMessage<?>> fetcher,
            KafkaMessageConverter<UUID, byte[]> messageConverter,
            Configurer configurer
    ) {

        SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<UUID, byte[]>builder()
                .topics(List.of("MY_TOPIC"))
                .groupId(groupId)                   // Hard requirement
                .consumerFactory(consumerFactory)   // Hard requirement
                .fetcher(fetcher)                   // Hard requirement
                .messageConverter(messageConverter)
//                .serializer() TODO xstream no security context exception
                .build();
        KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
        configurer.registerModule(kafkaMessageSourceConfigurer);
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;
    }

}

AxonEventProcessorConfig which registers the subscribing event processor

@Configuration
public class AxonEventProcessorConfig {

    private String groupId = "my-api";
    private String processorName = "myProcessor";

    @Autowired
    EventProcessingConfigurer eventProcessingConfigurer;

    @Autowired
    SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource;

    @PostConstruct
    public void postConstruct() {
//        eventProcessingConfigurer.usingSubscribingEventProcessors();
        eventProcessingConfigurer.assignProcessingGroup(groupId, processorName);
        eventProcessingConfigurer.registerSubscribingEventProcessor(
                processorName,
                configuration -> subscribableKafkaMessageSource
        );

    }
}

And an event handler as follows:

@Slf4j
@Component
@ProcessingGroup("myProcessor")
public class Handler {
//    @Autowired
//    private QueryGateway queryGateway;
//    @Autowired
//    private QueryBus queryBus;
//    @Autowired
//    private QueryUpdateEmitter emitter;

    @EventHandler
    public void on(Message<?> eventMessage){
        log.info("Got event: {}", eventMessage);
    }
}

If I run this as is, my events are correctly consumed.

However, if I do autowire the query components, I get the following log output:

2023-12-12T14:40:43.698+01:00  INFO 792710 --- [           main] d.v.c.a.my.MyApplication   : Started MyApplication in 5.967 seconds (process running for 6.179)
2023-12-12T14:40:44.066+01:00  INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor             : Worker assigned to segment Segment[0/0] for processing
2023-12-12T14:40:44.079+01:00  INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor             : Using current Thread for last segment worker: TrackingSegmentWorker{processor=SampleProcessor, segment=Segment[0/0]}
2023-12-12T14:40:44.085+01:00  INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor             : Fetched token: null for segment: Segment[0/0]

Most notably, it doesn’t seem like a Kafka Consumer is started at all. Additionally, since I don’t have a TokenStore, I’m rather surprised that it did not give me an exception. Regardless, nothing seems to happen further.

If I uncomment the eventProcessingConfigurer.usingSubscribingEventProcessors(); in AxonEventProcessorConfig#postConstruct, I don’t get the above TrackingEventProcessor logs, but nothing seems to happen. The Kafka Consumer is not created either in this case.

This seems like retrieving any query components has an associated init phase/handler that sets up other configurations that might conflict with the axon-kafka extension to some degree.

Do you potentially have any idea why this behaviour occurs? If it helps, I can also provide startup logs. I’ll be going over them now to try and find the discrepancy…

It seems like the TrackingEventProcessor logs are emitted in the “working” version as well, which I don’t quite understand. I’m going down the list of initialized components now to try and find some discrepancies.

There might be a ‘conflict’ with the postconstruct configuration. Instead, could you change it to something like:


    @Bean
    public ConfigurerModule configurerModule() {
        return new ProcessingConfigurerModule();
    }
    /**
     * An example {@link ConfigurerModule} implementation to attach configuration to Axon's configuration life cycle.
     */
    private static class ProcessingConfigurerModule implements ConfigurerModule {

        @Override
        public void configureModule(Configurer configurer) {
            EventProcessingConfigurer.PooledStreamingProcessorConfiguration 
            configurer.eventProcessing(
                              processingConfigurer -> processingConfigurer
                                      .assignProcessingGroup(groupId, processorName)
                                      .registerSubscribingEventProcessor(
                                          processorName,
                                          configuration -> subscribableKafkaMessageSource
                              )
                      );
        }
    }

Thank you, that seemed to be the issue. The registration was probably happening too late in the @PostConstruct.

The reason for the @PostConstruct was that initially, a circular dependency graph was formed due to:

        KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
        configurer.registerModule(kafkaMessageSourceConfigurer);

which requires the Configurer (springAxonConfigurer), which itself requires a list of all ConfigurerModules available, including the one you proposed.

Should anyone else be interested, the following is a working configuration:

@Configuration
public class AxonKafkaConfig {

    private String groupId = "my-api";

    private String processorName = "myProcessor";

    @Bean
    public ConsumerFactory<UUID, byte[]> consumerFactory(KafkaProperties consumerConfiguration) {
        return new DefaultConsumerFactory<>(consumerConfiguration.buildConsumerProperties());
    }

    @Bean
    public Fetcher<?, ?, ?> fetcher() {
        return AsyncFetcher.builder()
                .build();
    }

    @Bean
    public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer() {
        //        configurer.registerModule(kafkaMessageSourceConfigurer);
        return new KafkaMessageSourceConfigurer();
    }

    @Bean
    public ConfigurerModule configurerModule(SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource) {
        return configurer -> configurer.eventProcessing(
                processingConfigurer -> processingConfigurer
                        .assignProcessingGroup(groupId, processorName)
                        .registerSubscribingEventProcessor(
                                processorName,
                                configuration -> subscribableKafkaMessageSource
                        )
        );
    }

    @Bean
    public SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource(ConsumerFactory<UUID, byte[]> consumerFactory,
                                                                                         Fetcher<UUID, byte[], EventMessage<?>> fetcher,
                                                                                         KafkaMessageConverter<UUID, byte[]> messageConverter,
                                                                                         KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {

        SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<UUID, byte[]>builder()
                .topics(List.of("MY_TOPIC"))
                .groupId(groupId)                   // Hard requirement
                .consumerFactory(consumerFactory)   // Hard requirement
                .fetcher(fetcher)                   // Hard requirement
                .messageConverter(messageConverter)
//                .serializer()
                .build();
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;
    }

}

Notably, I am no longer calling configurer.registerModule(kafkaMessageSourceConfigurer); in this setup, though it’s listed in the relevant docs section.

Without delving too much deeper, I imagine this is done under the hood during
kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);. Maybe the docs should be updated?

As a side note deserving of a separate entry, I’ve found that, if I want to subscribe multiple topics containing different records, the provided topics entrypoint is insufficient.

The reason for this is that there is no filtering for a given processing group happening in org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource#subscribe:

    /**
     * Subscribe the given {@code messageProcessor} to this message source. When subscribed, it will receive all
     * messages published to this source.
     * <p>
     * If the given {@code messageProcessor} is already subscribed, nothing happens.
     * <p>
     * Any subscribed Event Processor will be placed in the same Consumer Group, defined through the (mandatory) {@link
     * Builder#groupId(String)} method.
     */
    @Override
    public Registration subscribe(java.util.function.Consumer<List<? extends EventMessage<?>>> eventProcessor) {
        if (this.eventProcessors.add(eventProcessor)) {
            logger.debug("Event Processor [{}] subscribed successfully", eventProcessor);
        } else {
            logger.info("Event Processor [{}] not added. It was already subscribed", eventProcessor);
        }

        if (autoStart) {
            logger.info("Starting event consumption as auto start is enabled");
            start();
        }

        return () -> {
            if (eventProcessors.remove(eventProcessor)) {
                logger.debug("Event Processor [{}] unsubscribed successfully", eventProcessor);
                if (eventProcessors.isEmpty() && autoStart) {
                    logger.info("Closing event consumption as auto start is enabled");
                    close();
                }
                return true;
            } else {
                logger.info("Event Processor [{}] not removed. It was already unsubscribed", eventProcessor);
                return false;
            }
        };
    }

and org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource#addConsumer:

    private void addConsumer(int consumerIndex) {
        Consumer<K, V> consumer = consumerFactory.createConsumer(groupId);
        subscriber.subscribeTopics(consumer);

        Registration closeConsumer = fetcher.poll(
                consumer,
                consumerRecords -> StreamSupport.stream(consumerRecords.spliterator(), false)
                                                .map(messageConverter::readKafkaMessage)
                                                .filter(Optional::isPresent)
                                                .map(Optional::get)
                                                .collect(Collectors.toList()),
                // Emphasis mine: all processors are used as-is
                eventMessages -> eventProcessors.forEach(eventProcessor -> eventProcessor.accept(eventMessages)),
                //               ^^^^^^^^^^^^^^^^^^^^^^^
                restartOnError(consumerIndex)
        );
        fetcherRegistrations.put(consumerIndex, closeConsumer);
    }

If I were to provide two topics, the same record would be handled by different event handlers in the processing group, which is not desired in my case as the handlers expect a deserialized record of a certain type. Perhaps I’m not using it quite as intended?

The workaround for this is to provide N SubscribableKafkaMessageSource, per topic, together with a ConfigurerModule per source. In my case, all of my SubscribableKafkaMessageSource are of type <byte[], byte[]> as there are no unifying key/value types.

I’m not quite sure what the intended usage of topics is, but this setup works for my use case.

I’m not sure I understand the problem. The handlers would only handle a specific class right? So, it would only call the event handler from either topic if it matches the class?