axon kafka not fetching events

Hi,
I’ve used the configuration as suggested in a previous post:

Configuration configuration = DefaultConfigurer.defaultConfiguration()
        .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
        .configureAggregate(AccountAggregate.class)
        .eventProcessing(ep -> ep
                .registerEventHandler(cf -> new AccountEventHandler())
                .registerSaga(CloseAccountSaga.class,
                        sc -> sc.configureSagaStore(c -> new InMemorySagaStore()))
                .byDefaultAssignTo("close-account")
                .registerTrackingEventProcessor("close-account", c -> kafkaMessageSource)
        )
        .buildConfiguration();

configuration.start();

and according to the log, it looks like a new kafka consumer is listening to a topic named “axon”. [1]

So far so good.

I’ve tried to send a message (from another external application) to the “axon” kafka topic but nothing happens to my axon application; I was expecting the triggering of my SagaHandler

@ProcessingGroup("close-account")
public class CloseAccountSaga {

but it looks like the kafka consumer is not fetching any messages.

The kafkaMessageSource has been created as:

Fetcher fetcher = AsyncFetcher.<String, byte[]>builder()
        .consumerFactory(consumerFactory)
        .topic(topicName)
        .pollTimeout(300, TimeUnit.MILLISECONDS)
        .build();
KafkaMessageSource messageSource = new KafkaMessageSource(fetcher);

Any idea?

Thanks

Hi,

if my understanding is correct, the reason why events are not fetched, it’s because the kafka message (I’m sending it using a nodejs application) produced is not decorated with “metadata”
axon-message-id
axon-message-revision
axon-message-timestamp
axon-message-type

Is is true?
How can I create a message from an external application (nodejs) with these metadata?

thanks,
Giovanni