Some questions on tracking

Hello, all.

I am working on deploying tracking processors in my project. I am using MongoDB as a persistence backend, and to provide a Token store, I have added the respective pull request https://github.com/AxonFramework/AxonFramework/pull/295 to my project to be able to use tracking at all in this case.

Then what I did is to configure the token store and to activate tracking processors in my axon configuration class (using spring-boot).

@Bean
public MongoTokenStore tokenStore(
org.axonframework.config.Configuration config
) {
org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate mongoTemplate = new org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate(
mongoClient, database, tokenCollection);
MongoTokenStore tokenStore = new MongoTokenStore(mongoTemplate,
config.serializer());
return tokenStore;
}

@Autowired
public void configure(
EventHandlingConfiguration config
) {
config.usingTrackingProcessors();
}

Over all, tracking seems to work. When I shut down the application, delete a projection and the matching token from the token store (both in Mongo), it is rebuilt on startup. Fine.

Now to the questions:

  1. I read, that I would be able to reset a tracker. How would I do this? TrackingEventProcessor only offers start() pause() shutDown, The TokenStore can sonly store tokens, not delete them.

More concretely if I would want to create an in-ram projection of some aggregates:

@Slf4j
@Component
@ProcessingGroup(AccountProjectorInRam.PROCESSING_GROUP)
@RequiredArgsConstructor
public class AccountProjectorInRam {

public static final String PROCESSING_GROUP = “inRamTest”;

@PostConstruct
public void init() {

// I want to reset the tracking here!

}

@EventHandler
public void on(
EventOne event
) throws UnknownCredentialsType {
log.info(“event: {}”, event);

}

@EventHandler
public void on(
EventTwo event
) {
log.info(“event: {}”, event);
}

}

Is the @PostConstruct the right place? What do I need to inject and how to reset? I suspect I am just looking in the wrong places for this.

  1. Extrapolating a bit, how does the replay behave in the case of horizontally scaling identical projections into multiple micro-services. By default the tokens are stored under the same name. So when I deploy Instace A of the projection, the replay runs, and a token is stored. Starting instance B would not get any replay when using the default naming. So basically on startup of a new instance I would have to generate a unique processing group Id?

  2. About the Mongo TokenStore:

Upon shutting down the application I get the following exception for every processing group. I suspect I am doing something stupid with my Mongo configuration?

java.lang.IllegalStateException: state should be: open
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:82) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:75) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:71) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:411) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.FindAndUpdateOperation.execute(FindAndUpdateOperation.java:331) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo.execute(Mongo.java:845) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo$2.execute(Mongo.java:828) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.MongoCollectionImpl.findOneAndUpdate(MongoCollectionImpl.java:435) ~[mongo-java-driver-3.4.2.jar:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.loadOrInsertTokenEntry(MongoTokenStore.java:152) ~[classes/:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.fetchToken(MongoTokenStore.java:83) ~[classes/:na]
at org.axonframework.eventhandling.tokenstore.TokenStore.extendClaim(TokenStore.java:71) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$processBatch$5(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:44) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:217) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:187) [axon-core-3.0.5.jar:3.0.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]

Thank you very much.

Best regards,
Dominic

Hello Dominic,

let me try to answer your questions, one at a time:

  1. Resetting a token is currently not supported by an API. You can reset a TrackingEventProcessor by stopping it (API call), then removing the token from the token store (no API call available, yet) and then starting the processor again.
    If your intent is to store the data in-memory, you will probably also want to store the token in memory. This way, a TrackingEventProcessor will automatically restart when the application starts.

  2. If you’re scaling services, and you’re configuring them to store data in the same database, they will also compete for tracking tokens. Currently, only a single token per TrackingEventProcessor is supported, so only 1 node can be actively consuming events at any time. This will change in 3.1, where multiple tokens are supported.
    If your services each have a separate datastore, then they would probably also have a token store that is stored in that same database. In such a setup, it doesn’t matter howmany services you have. Even if some services share a datastore, they will automatically start competing for these tokens.

  3. This is an issue that will be resolved in Axon 3.0.6. Spring will trigger a stop for each of the processors, but it won’t wait for the processors to shut down. It may happen that the Mongo clients have been disconnected while the EventProcessor is still processing its last event. In such case, these exceptions occur.

Hope this helps.
Cheers,

Allard

Hello, Allard.

Hello Dominic,

let me try to answer your questions, one at a time:

  1. Resetting a token is currently not supported by an API. You can reset a TrackingEventProcessor by stopping it (API call), then removing the token from the token store (no API call available, yet) and then starting the processor again.
    If your intent is to store the data in-memory, you will probably also want to store the token in memory. This way, a TrackingEventProcessor will automatically restart when the application starts.

Ok this explains my futile search. Thanks for clarifying this. So far I have not been able to figure out on how to assign different EventProcessors to individual projections.

  1. If you’re scaling services, and you’re configuring them to store data in the same database, they will also compete for tracking tokens. Currently, only a single token per TrackingEventProcessor is supported, so only 1 node can be actively consuming events at any time. This will change in 3.1, where multiple tokens are supported.
    If your services each have a separate datastore, then they would probably also have a token store that is stored in that same database. In such a setup, it doesn’t matter howmany services you have. Even if some services share a datastore, they will automatically start competing for these tokens.

Ok, then it is exactly as I figured at the moment. So currently I would tend to replicate the DB instances as well. So when scaling out, each instance would get its own DB/Tokenstore to work with as well.

  1. This is an issue that will be resolved in Axon 3.0.6. Spring will trigger a stop for each of the processors, but it won’t wait for the processors to shut down. It may happen that the Mongo clients have been disconnected while the EventProcessor is still processing its last event. In such case, these exceptions occur.

Also thanks for clarifying.

Hope this helps.

Yes, thanks a lot.

Best regards,
Dominic

Hi Dominic,

regarding assigning EventProcessors to projections, the default behavior of Axon is to divide event handlers into Processing Groups. Each ProcessingGroup has a name. By default, each EventHandler class is assigned to the ProcessingGroup with the name of the package of the Event Handler. This can be overridden using the @ProcessingGroup annotation. Then, Axon will create a Processor (Subscribing by default) for each ProcessingGroup. The name of the Processor is the name of the ProcessingGroup. If the Processor is a Tracking one, the TrackingToken will be registered with the TokenStore under that name.

Hope this clarifies.

Allard

This is clear to me. With regards to the assigning, I was wondering, on how to assign different TrackingProcessor implementations to different processing groups. So for example I want the MongoTrackingProcessor as a default. Easy to do by defining the matching bean and doing config.usingTrackingProcessors(); But can I assign a processor using the InMemoryTokenStore to a specific processing group, while keeping the one using mongo as a default ?

Thanks,
Dominic

If you want to create a processor with specific configuration for just one group, you can use the “org.axonframework.config.EventHandlingConfiguration#registerEventProcessor” method. The second parameter (an EventProcessorBuilder) is a function that must create the processor, based on the configuration, a given name (which is always equal to the first parameter, for this method), and list of event handlers.
In your case, the function would create a TrackingEventProcessor that is configured with the InMemoryTokenStore. In the (overall) configuration, you would configure the MongoTokenStore, so that other TrackingProcessors will use that one.

Cheers,

Allard

Thank you, Allard. Will look into this.
Best regards,
Dominic