Below is how my @Configuration class looks like
and I am getting error as :
Description:
The dependencies of some of the beans in the application context form a cycle:
┌─────┐
| producerFactory defined in class path resource [com/test/testfw/KafkaEventPublicationConfiguration.class]
↑ ↓
| kafkaPublisher defined in class path resource [com/test/testfw/KafkaEventPublicationConfiguration.class]
└─────┘
@Bean
public ProducerFactory<String, byte[]> producerFactory(Map<String, Object> producerConfiguration) {
return DefaultProducerFactory.<String, byte[]>builder()
.closeTimeout(Duration.ZERO) // Defaults to "30" seconds
.producerCacheSize(10000) // Defaults to "10"; only used for "TRANSACTIONAL" mode
.configuration(producerConfiguration) // Hard requirement
.confirmationMode(ConfirmationMode.WAIT_FOR_ACK) // Defaults to a Confirmation Mode of "NONE"
.transactionalIdPrefix("test") // Hard requirement when in "TRANSACTIONAL" mode
.build();
}
@Bean
//@DependsOn({"producerFactory"})
public KafkaPublisher<String, byte[]> kafkaPublisher(
ProducerFactory<String, byte[]> producerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
return KafkaPublisher.<String, byte[]>builder()
.topic("Axon.Events") // Defaults to "Axon.Events"
.producerFactory(producerFactory) // Hard requirement
.messageConverter(kafkaMessageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.publisherAckTimeout(1000) // Defaults to "1000" milliseconds; only used for "WAIT_FOR_ACK" mode
.build();
}
@Bean
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
return KafkaEventPublisher.<String, byte[]>builder()
.kafkaPublisher(kafkaPublisher) // Hard requirement
.build();
}
@Bean
public void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
.assignHandlerTypesMatching(
processingGroup,
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
)
.registerSubscribingEventProcessor(processingGroup);
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
}