Kafka with tracking mongostore

why some time we can not connect to Kafka without any exception?

axon.serializer.general=jackson
axon.kafka.client-id=myproducer
axon.kafka.default-topic=zmytopic
axon.kafka.producer.retries=5
axon.kafka.bootstrap-servers=localhost:9092
axon.kafka.producer.transaction-id-prefix=clxtrx
axon.distributed.enabled=true
axon.kafka.publisher.enabled=true
axon.kafka.producer.event-processor-mode=tracking
mongo.event-store.enabled=true
In fact, in order to be able to trace a message after it is executed in the relevant aggregate, how can I see if it is placed on Kafka or not?

Could you please expand your question? I don’t understand the problem you are facing.

We are using Spring Boot, sometimes the ConfigurerModule is not executed correctly, and as a result the processorGroups are not processed correctly.

@Value("${event.processor.group.list.property}")
private List<String> processorGroupListProperty;

@Value("${axon.kafka.default-topic}")
private List<String> defaultTopic;
@Bean
public ConfigurerModule processorDefaultConfigurerModule() {
    return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingTrackingEventProcessors);
}

@Bean
public ConfigurerModule kafkaConfigurerModule(
        ConsumerFactory<String, byte[]> consumerFactory,
        Fetcher<String, byte[], KafkaEventMessage> fetcher,
        KafkaMessageConverter<String, byte[]> messageConverter,
        @Qualifier("eventSerializer") Serializer serializer,
     EventProcessingConfigurer eventProcessingConfigurer) {
    return new ConfigurerModule() {
        @Override
        public void configureModule(Configurer configurer) {
            ExecutorService executorService= Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));
            AsyncFetcher<?, ?, ?> asyncFetcher = AsyncFetcher.builder()
                    .pollTimeout(100)
                    .executorService(executorService)// Defaults to "5000" milliseconds
                    .build();
            KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
            configurer.registerModule(kafkaMessageSourceConfigurer);
            StreamableKafkaMessageSource<String, byte[]> kafkaMessageSource = StreamableKafkaMessageSource.<String, byte[]>builder()
                    .topics(defaultTopic)                                          // Defaults to a collection of "Axon.Events"
                    .serializer(serializer)
                    .consumerFactory(consumerFactory)                               // Hard requirement
                    .fetcher((Fetcher<String, byte[], KafkaEventMessage>) asyncFetcher)                                               // Hard requirement
                    .messageConverter(messageConverter)                             // Defaults to a "DefaultKafkaMessageConverter"
                    .bufferFactory(
                            () -> new SortedKafkaMessageBuffer<>(1000))   // Defaults to a "SortedKafkaMessageBuffer" with a buffer capacity of "1000"
                    .build();

            if(processorGroupListProperty!=null){
                for (String group : processorGroupListProperty) {
                    eventProcessingConfigurer.registerTrackingEventProcessor (group, c -> kafkaMessageSource);
                }
            }

        }
    };
}

There can be issues with order. It likely improves to combine both configurations in one bean.

I will do it.
thank you.