Configure kafka using the ConfigurationAPI (no spring)

Hi,
I'm trying to use the Configuration API (without spring) to configure my axon application. 

I'm able to configure:
 - a saga (CloseAccountSaga)
 - an event handler (AccountEventHandler) 
 - an aggregate (AccountAggregate)

and I would like also to configure an apache kafka message source for the CloseAccountSaga. 
My CloseAccountSage class is annotated with 
@ProcessingGroup("close-account")

However I'm not able to create a configuration that works.
The snippet below is an attempt I did but without success (I'm trying to create a TrackingEventProcessor)

Any idea?

Thanks

EventStore eventStore = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build();
AnnotatedSagaRepository<CloseAccountSaga> sagaRepository = AxonUtil.createAnnotatedSagaRepository(CloseAccountSaga.class);
Supplier<CloseAccountSaga> accountSagaSupplier = () -> new CloseAccountSaga(eventStore);
AnnotatedSagaManager<CloseAccountSaga> sagaManager = AxonUtil.createAnnotatedSagaManager(accountSagaSupplier, sagaRepository, CloseAccountSaga.class);

KafkaMessageSource kafkaMessageSource = AxonKafkaConfig.createMessageSource("axon");
EventProcessingModule eventProcessingModule = new EventProcessingModule();
eventProcessingModule.registerSaga(CloseAccountSaga.class, sc -> sc.configureSagaStore(c -> new InMemorySagaStore())
    .configureRepository(c -> sagaRepository)
    .configureSagaManager(c -> sagaManager)
    .configureSagaStore(c -> new InMemorySagaStore()))
    .registerEventProcessor("close-account", (name, conf, eventHandlerInvoker) ->
            TrackingEventProcessor.builder()
                   .name(name)
                   .eventHandlerInvoker(eventHandlerInvoker)
                   .messageSource(kafkaMessageSource).build())
           .assignProcessingGroup(group -> "close-account");

        Configuration configuration = DefaultConfigurer.defaultConfiguration()
                .configureEventStore(configuration1 -> eventStore)
                .registerModule(eventProcessingModule)
                .configureAggregate(AccountAggregate.class)
                .eventProcessing(eventProcessingConfigurer -> eventProcessingConfigurer
                        .registerEventHandler(cf -> new AccountEventHandler()))
                .registerComponent(ListenerInvocationErrorHandler.class,
                        c -> (e, eventMessage, eventMessageHandler) -> { })
                .buildConfiguration();

Hi Giovanni,

while I don’t immediately see why this configuration doesn’t work, there is a lot of unnecessary configuration here.

First of all, you don’t need to specify an EventProcessingModule yourself. Just do this:

Configuration configuration = DefaultConfigurer.defaultConfiguration()
                                               .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
                                               .configureAggregate(AccountAggregate.class)
                                               .eventProcessing(ep -> ep
                                                       .registerEventHandler(cf -> new AccountEventHandler())
                                                       .registerSaga(CloseAccountSaga.class,
                                                                sc -> sc.configureSagaStore(c -> new InMemorySagaStore()))
                                                       .byDefaultAssignTo("close-account")
                                                       .registerTrackingEventProcessor("close-account", c -> kafkaMessageSource)
                                               )
                                               .buildConfiguration();



Note that you can do .configureEmbeddedEventStore(), which ensures that all is configured properly based on the EventStorageEngine you provide.
The eventProcessing(...) method allows you to tune the event processing based on the "ep" instance provided. No need to explicitly configure this as a module.
In SagaConfiguration, only explicitly configure what you need. Generally, you'll only need to configure a SagaStore. The rest is automatic.

The last two lines of the EventProcessor Configuration are different. If you want to add every handler to the same group, then just use "byDefaultAssignTo". 
Lastly, you don't have to construct a tracking processor yourself. Just tell Axon to create one with a name, configured to read from the source you specify.

Hope this helps.
Cheers,

Allard
PS. Note that the Kafka extension is not GA, yet.

Allard,
thanks a lot for your reply. Very very helpful.
I will try it ASAP.
I hope to see in reference doc more bits for the usage of axon when there is no spring.

Best Regards