why axon have a daily in-consumer message from Kafka consumer?
for every consume message about 5 s we need
@Bean
public ConfigurerModule processorDefaultConfigurerModule() {
return configurer → configurer.eventProcessing(EventProcessingConfigurer::usingTrackingEventProcessors);
}
@Bean
public ConfigurerModule kafkaConfigurerModule(
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], KafkaEventMessage> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
@Qualifier("eventSerializer") Serializer serializer,
EventProcessingConfigurer eventProcessingConfigurer) {
return new ConfigurerModule() {
@Override
public void configureModule(Configurer configurer) {
ExecutorService executorService= Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));;
AsyncFetcher<?, ?, ?> asyncFetcher = AsyncFetcher.builder()
.pollTimeout(100)
.executorService(executorService)// Defaults to "5000" milliseconds
.build();
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
StreamableKafkaMessageSource<String, byte[]> kafkaMessageSource = StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(defaultTopic) // Defaults to a collection of "Axon.Events"
.serializer(serializer)
.consumerFactory(consumerFactory) // Hard requirement
.fetcher((Fetcher<String, byte[], KafkaEventMessage>) asyncFetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.bufferFactory(
() -> new SortedKafkaMessageBuffer<>(1000)) // Defaults to a "SortedKafkaMessageBuffer" with a buffer capacity of "1000"
.build();
if(processorGroupListProperty!=null){
for (String group : processorGroupListProperty) {
eventProcessingConfigurer.registerTrackingEventProcessor (group, c -> kafkaMessageSource);
}
}
}
};
}
for every consume message about 5 s we need