Axon Framework with Kafka without Axon-Server

Below is how my @Configuration class looks like
and I am getting error as :

Description:

The dependencies of some of the beans in the application context form a cycle:
┌─────┐
| producerFactory defined in class path resource [com/test/testfw/KafkaEventPublicationConfiguration.class]
↑ ↓
| kafkaPublisher defined in class path resource [com/test/testfw/KafkaEventPublicationConfiguration.class]
└─────┘

@Bean
    public ProducerFactory<String, byte[]> producerFactory(Map<String, Object> producerConfiguration) {
        return DefaultProducerFactory.<String, byte[]>builder()
                .closeTimeout(Duration.ZERO)                 // Defaults to "30" seconds
                .producerCacheSize(10000)       // Defaults to "10"; only used for "TRANSACTIONAL" mode
                .configuration(producerConfiguration)       // Hard requirement
                .confirmationMode(ConfirmationMode.WAIT_FOR_ACK)         // Defaults to a Confirmation Mode of "NONE"
                .transactionalIdPrefix("test") // Hard requirement when in "TRANSACTIONAL" mode
                .build();
    }

    @Bean
    //@DependsOn({"producerFactory"})
    public KafkaPublisher<String, byte[]> kafkaPublisher(
                                                         ProducerFactory<String, byte[]> producerFactory,
                                                         KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
        return KafkaPublisher.<String, byte[]>builder()
                .topic("Axon.Events")                               // Defaults to "Axon.Events"
                .producerFactory(producerFactory)           // Hard requirement
                .messageConverter(kafkaMessageConverter)    // Defaults to a "DefaultKafkaMessageConverter"
                .publisherAckTimeout(1000)   // Defaults to "1000" milliseconds; only used for "WAIT_FOR_ACK" mode
                .build();
    }

    @Bean
    public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
        return KafkaEventPublisher.<String, byte[]>builder()
                .kafkaPublisher(kafkaPublisher)             // Hard requirement
                .build();
    }

    @Bean
    public void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
                                                  KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
        String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
        eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
                .assignHandlerTypesMatching(
                        processingGroup,
                        clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
                )
                .registerSubscribingEventProcessor(processingGroup);
        // Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
    }

Welcome to the forum, @Anuj_Agrawal.
I feel that you have broken up your question/problem halfway.
I am basing this assumption on the title and the description, by the way.

Would you mind filling in the rest so that everybody knows exactly what kind of help you’re looking for?
Thanks!

Thanks Steven.
Hey @Steven_van_Beelen , apologies, not sure how I missed the description.
Does this makes sense now?

Sorry, I might not have been clear enough.
My point was that your description does not contain a question.
As in, no sentence ends with a question mark at all.

Now, I can deduce that it’s likely because of the circular dependency you are experiencing.
Note that this isn’t so much an Axon problem, but rather a problem with Spring Boot’s wiring “magic.”
I am intentionally calling it magic, as to solve this, we need to try out some things.

The exception description you’ve shared states that the kafkaPublisher bean and the producerFactory bean form a circular dependency.
However, from the method description, we can only note that the kafkaPublisher needs the producerFactory and not vice versa.

Based on this, I am assuming Spring Boot has difficulty finding the Map<String, Object> producerConfiguration parameter. Thus, why not try to make the producerFactory differently, like:

  • through static parameters, or
  • by exposing an actual class for these parameters, like a custom ProducerFactoryConfiguration, or
  • by moving the bean creation method to a different class, removing the inner dependencies of the KafkaEventPublicationConfiguration altogether.

Let us know if any of these solve the problem at hand, @Anuj_Agrawal.

Hi,

the issue is that Spring is more strict when it comes to circular dependencies since it’s more recent versions. You configuration (which is itself a bean) has a dependency on a bean that is created in that same configuration. Spring need the config to create the bean, and the bean to create the config… boom.

The way to solve this, is change your registerPublisherToEventProcessor method to return an actual bean. If you make that of type ConfigurerModule and implement its only (non-default) method void configureModule(Configurer configurer), then the problem should be fixed.

Note that you don’t need a dependency on the EventProcessingConfiguration in that case. You can access it through the Configurer configurer parameter: configurer.eventProcessing().

Hope that helps.