Event processor via configuration API

I have a Bean with event handlers that I want to configure to track events in order to allow replay. For that I simply added it the ProcessingGroup annotation :

`
@ProcessingGroup(“readModelProcessor”)
public class MyReadModelEventHandler {
@EventHandler
public void handleMyEvent(MyEvent e) {

}
}
`

Then in a spring application, I complete my configuration with two additional methods :

`
@Configuration
public class Config {

@Bean
public TokenStore tokenStore(MongoTemplate mongoTemplate, Serializer serializer) {
return new MongoTokenStore(mongoTemplate, serializer);
}

@Autowired
public void configureEventProcessors(EventProcessingConfiguration configuration) {
configuration.registerTrackingEventProcessor(“readModelProcessor”);
}
}
`

and all work fine : the token is peristed in mongo database, I can replay events… Simple and efficient.

Now I need to do the same thing in an older application without spring. So I use the configuration API… and it does not work.
That’s what I did. I must miss something, but I do not know what :

MongoTokenStore tokenStore = new MongoTokenStore(mongoTemplate, new XStreamSerializer()); EventProcessingConfiguration eventProcessingConfiguration = new EventProcessingConfiguration(); eventProcessingConfiguration.registerTokenStore("readModelProcessor", c -> tokenStore); eventProcessingConfiguration.registerTrackingEventProcessor("readModelProcessor");

and I added it to my existing configuration :

`
EventHandlingConfiguration eventHandlingConfiguration = new EventHandlingConfiguration()
.registerEventHandler(c -> new AnnotationEventListenerAdapter(new MyReadModelEventHandler()));

axonConfiguration = DefaultConfigurer.defaultConfiguration()
.configureCommandBus(c -> new SimpleCommandBus())
.configureEventStore(c -> eventStore)
.configureAggregate(myAggregateConf)
.registerCommandHandler(c -> new AggregateAnnotationCommandHandler<>(MyAggregate.class, myAggregateRepository))
.registerModule(eventProcessingConfiguration)
.registerModule(eventHandlingConfiguration)
.buildConfiguration();
`

My event handlers continue to be called, but the tracking mode does not work : there is no collection in the mongodb database.

Any idea about missing part ?
Thanks.

Any idea ?

Hi,

I see a few things in the configuration that look strange, and there are a few components missing, so I don’t have a full picture of the components you have wired.

  • You do registerCommandHandler(c -> new AggregateAnnotationCommandHandler(…)) . That is not necessary, as you already configured the aggregate.
  • You do not have to wrap your event handler in an AnnotationEventListenerAdapter. Axon will do that for you if necessary. As a side-effect of wrapping it yourself, the registered instance (an AnnotationEventListenerAdapter) doesn’t have the @ProcessingGroup annotation anymore.

Also, I recommend using configureEmbeddedEventStore(c -> eventStorageEngine) instead of configureEventStore(…). It will ensure the proper shutdown handlers are configured and that any monitoring components that have been defined are also taken into account. Only configure an EventStore in its entirety if you need to customize it.

Hope this helps.
Cheers,

Allard

Hi Allard,

This is fixed. The problem was that the handler was wrapped in an AnnotationEventListenerAdapter when registered, so the ProcessingGroup annotation was not seen by Axon.

Thanks to your message I simplified our configuration by not registering aggregate with registerCommandHandler, by registering storage instead of store and by removing configuration of simple command bus.
I think that we can simplify a last thing. We assign the token store to our processor :

eventProcessingConfiguration.registerTokenStore("readModelProcessor", c -> tokenStore);

Could we declare this store as the default store so we could remove the above line, and without other configuration all our processors will use it ?

Cheers.

Hi,

you can register it using:
configurer.registerComponent(TokenStore.class, c-> tokenStore);
or in Spring, make it available as an @Bean.

It will then be used as a default for all tracking processors.

Cheers,

Allard

I did it with spring.
I just changed it for configuration API and it works fine. Perfect.

Cheers