zeynab
(Zeynab)
February 4, 2024, 8:12am
1
why some time we can not connect to Kafka without any exception?
axon.serializer.general=jackson
axon.kafka.client-id=myproducer
axon.kafka.default-topic=zmytopic
axon.kafka.producer.retries=5
axon.kafka.bootstrap-servers=localhost:9092
axon.kafka.producer.transaction-id-prefix=clxtrx
axon.distributed.enabled=true
axon.kafka.publisher.enabled=true
axon.kafka.producer.event-processor-mode=tracking
mongo.event-store.enabled=true
In fact, in order to be able to trace a message after it is executed in the relevant aggregate, how can I see if it is placed on Kafka or not?
Gerard
(Gerard Klijs)
February 23, 2024, 10:39am
2
Could you please expand your question? I don’t understand the problem you are facing.
zeynab
(Zeynab)
February 28, 2024, 12:27pm
3
We are using Spring Boot, sometimes the ConfigurerModule is not executed correctly, and as a result the processorGroups are not processed correctly.
@Value("${event.processor.group.list.property}")
private List<String> processorGroupListProperty;
@Value("${axon.kafka.default-topic}")
private List<String> defaultTopic;
@Bean
public ConfigurerModule processorDefaultConfigurerModule() {
return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingTrackingEventProcessors);
}
@Bean
public ConfigurerModule kafkaConfigurerModule(
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], KafkaEventMessage> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
@Qualifier("eventSerializer") Serializer serializer,
EventProcessingConfigurer eventProcessingConfigurer) {
return new ConfigurerModule() {
@Override
public void configureModule(Configurer configurer) {
ExecutorService executorService= Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));
AsyncFetcher<?, ?, ?> asyncFetcher = AsyncFetcher.builder()
.pollTimeout(100)
.executorService(executorService)// Defaults to "5000" milliseconds
.build();
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
StreamableKafkaMessageSource<String, byte[]> kafkaMessageSource = StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(defaultTopic) // Defaults to a collection of "Axon.Events"
.serializer(serializer)
.consumerFactory(consumerFactory) // Hard requirement
.fetcher((Fetcher<String, byte[], KafkaEventMessage>) asyncFetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.bufferFactory(
() -> new SortedKafkaMessageBuffer<>(1000)) // Defaults to a "SortedKafkaMessageBuffer" with a buffer capacity of "1000"
.build();
if(processorGroupListProperty!=null){
for (String group : processorGroupListProperty) {
eventProcessingConfigurer.registerTrackingEventProcessor (group, c -> kafkaMessageSource);
}
}
}
};
}
Gerard
(Gerard Klijs)
February 28, 2024, 12:46pm
4
There can be issues with order. It likely improves to combine both configurations in one bean.