Hi,
I’ve used the configuration as suggested in a previous post:
Configuration configuration = DefaultConfigurer.defaultConfiguration()
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.configureAggregate(AccountAggregate.class)
.eventProcessing(ep -> ep
.registerEventHandler(cf -> new AccountEventHandler())
.registerSaga(CloseAccountSaga.class,
sc -> sc.configureSagaStore(c -> new InMemorySagaStore()))
.byDefaultAssignTo("close-account")
.registerTrackingEventProcessor("close-account", c -> kafkaMessageSource)
)
.buildConfiguration();
configuration.start();
and according to the log, it looks like a new kafka consumer is listening to a topic named “axon”. [1]
So far so good.
I’ve tried to send a message (from another external application) to the “axon” kafka topic but nothing happens to my axon application; I was expecting the triggering of my SagaHandler
@ProcessingGroup("close-account")
public class CloseAccountSaga {
but it looks like the kafka consumer is not fetching any messages.
The kafkaMessageSource has been created as:
Fetcher fetcher = AsyncFetcher.<String, byte[]>builder()
.consumerFactory(consumerFactory)
.topic(topicName)
.pollTimeout(300, TimeUnit.MILLISECONDS)
.build();
KafkaMessageSource messageSource = new KafkaMessageSource(fetcher);
Any idea?
Thanks