Kafka Integration

Hi, I’m using the Kafka example from GitHub (https://github.com/marinkobabic/axon-kafka-example) and it’s working when connecting to a locally deployed Kafka cluster. As suggested by the docs, I’m trying to use a custom org.springframework.kafka.core.ProducerFactory by exposing a KafkaPublisher bean and overriding withProducerFactory(). My bean is created, but createProducer() of my custom ProducerFactory is never getting called. Any suggestions?

@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(ProducerFactory factory) {
    KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
        .withMessageSource(new SimpleEventBus())
        .withProducerFactory(new org.axonframework.kafka.eventhandling.producer.ProducerFactory<String, byte[]>() {
            @Override
            public Producer<String, byte[]> createProducer() {
                return factory.createProducer();
            }

            @Override
            public void shutDown() {
            }
        })
        .withTopic(topic)
        .build();
    KafkaPublisher<byte[], byte[]> publisher = new KafkaPublisher<>(configuration);
    publisher.start();
    return publisher;
}

I was able to solve my problem by keeping the KafkaPublisher, but I needed to expose an EventBus bean (which I qualified in Sender.java) and wire up the publisher to it. I also needed to expose my own axon ProducerFactory that wrapped my own kafka ProducerFactory. My final configuration is as follows:

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
class AxonConfig {

    @Value("${axon.kafka.default-topic}")
    private String topic;

    @Bean
    @ConditionalOnMissingBean
    public org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]> producerFactory(ProducerFactory factory) {
        return new org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]>() {
            @Override
            public Producer<byte[], byte[]> createProducer() {
                return factory.createProducer();
            }

            @Override
            public void shutDown() {
            }
        };
    }

    @Bean("eventBus")
    EventBus eventBus() {
        return new SimpleEventBus();
    }

    @Bean
    KafkaPublisher<byte[], byte[]> kafkaPublisher(org.axonframework.kafka.eventhandling.producer.ProducerFactory factory, EventBus eventBus) {
        KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
            .withMessageSource(eventBus)
            .withProducerFactory(factory)
            .withTopic(topic)
            .build();
        PhotonKafkaPublisher<byte[], byte[]> publisher = new PhotonKafkaPublisher<>(configuration);
        eventBus.subscribe((events) -> publisher.sendEvents(events));
        publisher.start();
        return publisher;
    }

Hi Ryan,

Happy to see you’ve resolved the problem already.

Additionally, thanks for sharing the solution here with everybody, so that others hopefully will overcome this problem faster.

Just out of curiosity, it is correct that you created the Stack Overflow question regarding this too, I assume?

Cheers,
Steven

No problem! Yes, that’s my question.