I have an application which uses Axon 3 and Spring boot. I want to add the event replay functionality can you provide some examples for the same. The current documentation does not provide major details.
I did try referring to the following blogs.
To be able to perform a replay, you’ll first need to ensure your Event Handling Component is receiving events from a ‘TrackingEventProcessor’ rather than a ‘SubscribingEventProcessor’.
You can do that by either (1) adjusting the default Event Processor to tracking or (2) by making the group your Event Handling Component is a part of tracking.
Both can be achieved by for example doing the following:
Pull out the ‘EventHandlingConfiguration’ in an ‘@Autowired’ void method and call the ‘usingTrackingProcessors()’ function on it. This switches the default from subscribing to tracking.
Pull out the ‘EventHandlingConfiguration’ in an ‘@Autowired’ void method and call the ‘registerTrackingProcessor({processor-group-name})’ function. This will adjust that specific Processing Group to be backed by a TrackingEventProcessor. The ‘processor-group-name’ is either the package name of your Event Handling Component or the field set in the ‘@ProcessingGroup({processing-group-name})’ annotation.
Axon 3.1 (latest version) has a MongoTokenStore. So yes, you can use MongoDb as a TokenStore.
Yes it is compatible, any event store which genuinely stores events (so not the in memory version) can be used to replay.
You would have to provide your own TokenStore implementation, not necessarily a new TokenEntry though. I however recommend moving to Axon 3.1 and leveraging the provided MongoTokenStore.
Where/why do you want to validate this step? A very simple validation could be to see that a Token is stored in the TokenStore for a Processing Group you’ve defined. That token means a TrackingEventProcessor is handling events by pushing them to it’s contained Event Handling Components.
By removing the Tracking Token from the Token Store of an Processing Group you want to be replayed. We’re working on a nicer solution where you can do API calls to replay a given Processing Group.
I am going to try that today. I was able to override the JPA repo with Mongo Repo and I am getting the events back from event store in the processor. The problem is I am getting them everytime I restart the application.
Now I am trying to investigate what the replay functionality should do. For now I am assuming that the replay should push the events on their respective bus for replay.
With your advice and example from Simon Zambrovski I was able to implement the events replay. But I have some questions related to architecture. I started with implement the tracking event processor on command side
The tracking event processor should be on the command side or query side.
Are these events read from the bus or from the store
In my case, It is getting invoked everytime something is pushed to the bus, which is not what I desire.
My goal is to invoke event replay via a Rest API and events must be pulled from the event store.
I think I found the answer for my question. I am successfully able to use JPA token store based on axon 3.1 documentation. But I am stuck on while using MongoTokenStore.
My code base looks like this.
Config File
TokenStore getMongoTokenStore() {
// Using the Axon Mongo Template and JSON serializer
return new MongoTokenStore(axonMongoTemplate(), axonJsonSerializer());
}
My Repo
public interface TokenMongoRepository extends MongoRepository<MongoTrackingToken, String> {
}
But unlike JPA where my tables are getting created successfully. In case of mongo I do not see any collections getting created. Am I missing something.
The Command Side of your CQRS application are typically your aggregates or command handling components which store events in the Event Store. [Tracking/Subscribing] Event Processors typically update your query models, and thus live in the Query Side.
In Axon, the Event Store is a specific implementation of the Event Bus; they’re thus the same thing. However, the Tracking Event Processors are a event pulling mechanism, thus pulling events from the store by periodically checking if it (read the Tracking Event Processor) is up to date in regards of handling events.
I’m not exactly sure what you mean with ‘it’ in this scenario? Do you mind elaborating with ‘it’ is in the third question? If ‘it’ is the Tracking Event Processor, and ‘being invoked’ is that it is handling events, then it desired behavior that it is invoked every time new events are present in the Event Store. If you handled those events in you Event Handling Component is another story entirely though, as that depends on the @EventHandler annotated functions you’ve created there. If ‘it’ is the replay, and ‘being invoked’ is on application start up, then that happens because you have got In Memory Tracking Tokens, suggesting you haven’t wired the desired Token Store correctly. Nonetheless, a better description of what’s not working as desired might help me give you a better answer.
Sadly, the Axon Framework isn’t providing that API out of the box yet, so you’ll have to create your own API for that. The explanation Simon give there might definitely help you out. That you want the following, ‘events must be pulled from the event store’, is exactly what the Tracking Event Processors will do. A Tracking Event Processor pulls the events from the store and keeps track of how far it got with handling events from the event store, by keeping a tracking token.
For you’re last question, I can tell you that you’re not required to write your own MongoRepository bean.
The only requirement Axon Framework has, is that you provide a MongoTokenStore, with the desired MongoTemplate and the Serializer you’re using throughout your system.
I’m however no Mongo expert, so I can’t tell you if Mongo should create the token table for you automatically or if you should be creating that yourself.
Hope this helps you out Ajinkya, please don’t hesitate to answer more questions to clarify things!
Sorry for the confusion, by “IT” I ment the Tracking Processor. No issues about the Mongo Expertise the issue that is blocking me is unlike the JPA Token Store there is not enough information or examples available to implement the Mongo Store.
I am using the MongoTrackingToken and MongoTokenStore but JPA has some more components such as TokenEntry which are missing. So if you can provide me a sample of how the Mongo Repo is supposed to be implemented it would be great.
what do you mean with “implementing Mongo [Token] Store”? It’s already there in Axon 3.1. All you need is (when using Spring Boot) define a bean of type MongoTokenStore.