Can Axon 3.1 Tracking Processors Replay Selective Events

Hello,

We currently use Axon 2 and sometimes we replay only selected events (e.g. a set of events relating to a single business item) by supplying Criteria to a ReplayingCluster.

We want to upgrade to Axon 3 primarily to benefit from multi-threaded replays by assigning different business items to different Tracking Processors.

We also need to retain our existing ability to replay only a single business item. Can we do this using Axon 3?

Thanks,

Daniel

Hello Daniel,

It’s sad to inform you, but Axon 3.x’s API currently only allows ‘resetting’ a TrackingEventProcessor (fyi, you can regard a TrackingEventProcessor as similar to a Clusters in Axon 2).

You can currently not provide criteria to select only certain events to be replayed will calling the reset API.

I do agree it would be a good thing to reintroduce this into the stack though.

What you can do know however, is add a parameter into the event handlers of your Event Handling Components which knows whether it’s Event Processor is replaying or not. This parameter is called the ‘ReplayStatus’, which is an enum with two (currently) possible values: ‘Replay’ and ‘Regular’.

This might allow you to leverage the capability to only handle certain event handling functions in case you’d reset your TrackingEventProcessor.

Would what I proposed just now solve your specific use case, or do you need to introduce more specific criteria on a replay?

If not, I’d argue we should add an issue to the AxonFramework issue list.

Cheers,

Steven

Thanks for your reply,

Unfortunately switching certain handlers on / off wouldn’t work - although it is a nice feature.

Does that also mean that Subscribing Event Processors in Axon 3.1 also currently can not replay selected events?

Hi Daniel,

You’re assumption is correct, a SubscribingEventProcessor cannot be replayed.

The reason it cannot, is because it subscribes to the event bus and receives events as they happen.

The TrackingEventProcessor on the other hand ‘tracks the event bus’ on it’s own accord.

To be able to do that correctly and consistently after a start up of the system for example, it needs to store ‘which events it has handled’.

It does this by means of a TrackingToken. As the knowledge of how much it has handled is stored, we can thus tell the TrackingEventProcessor to start a new by adjusting the token.

Hope this helps!

Cheers,

Steven

Hi Steven,

Realized that we’d also need something similar as suggested by Daniel whereby only selected events are taken into consideration when performing replays, even though our use-case for replays are slightly different. Let me explain:

We’re currently using Axon 3.2 & have configured a few EventTrackingProcessors to use InMemoryTrackingTokens. This allows us to build an in-memory representation of a small part of our business model by subscribing to a finite set of interested events per tracking processor. Having this in-memory model is useful especially on transactional side, in order to perform lookups / validations across aggregates in most cases. The drawback of replaying the event stream from global-index 0 each time a node is restarted has become evident as it takes a very long time to catch-up to head of stream because it needs to step through each event stored. What would greatly improve performance would be to selectively step through events that are bound to be processed by a EventTrackingProcessor.

I had a brief look at code & working back from the query it would be possible to have a where clause with a condition that check the ‘payloadtype’ IN (finite-set-of-events). The events could be passed to StorageEngine.fetchTrackedEvents(…) using a specialization of TrackingToken.

Is this at all possible or am I fundamentally missing something big/important?

Cheers,
Dylan

Hi Dylan!

The selective selection of events from an EventStore through the method you describe would definitely be an option.
This is however also a feature we will be introducing in AxonDB, so do take that into account prior to trying to introduce this your selves through means of ‘FilteringEventStorageEngines’.

So in short, your suggestion is definitely possible :slight_smile:

Hope this helps you out Dylan!

Cheers,
Steven

Hi Steven,

I had a suspicion that this would be a feature of AxonDB :slight_smile: I had a look at ‘FilteringEventStorageEngine’ suggestion & what I could tell from the javadoc alone it’s not a solution:

Implementation of EventStorageEngine that delegates to another implementation, while filtering
* events as they are appended. This prevents certain events to be stored in the Event Store

Ours is not an append time problem we’re trying to solve but rather selective reads per tracking-processor, or I might have misunderstood the suggestion you proposed.

With some high-level guidance on your part I’d be more than willing to submit a PR. I had a look at some possible integration points to build the solution on:

* (Jdbc)EventStorageEngine.getTrackedEventData(…) - Key to this would be to have a custom token that holds onto a list of event types that will be used to build an IN clause from.

* Custom (GapAware)TrackingToken - as mentioned above used to hold list of event types we’re interested in.
* Custom TrackingEventProcessor - that should pass event type list to token when invoking storageEngine.getTrackedEventData(…)

* EventProcessorBuilder - inits above mentioned tracking processor with list of events. A possible solution of obtaining list of event types might be to perform a “reverse-lookup” from beans that form part of processing group & then use reflection interrogate all the event handlers that are registered to obtain the list of event types.

This is based on my very limited understanding on how things fit together so I might be completely off the mark. I already encountered some constraints & questioned my understanding on:

* GapAwareTrackingToken is essentially final (private constructor)
* How would batching be implemented as only a subset of events are read & global index might not be applicable.
* No idea how parallel processing will be effected.

Anyways any guidance would greatly be appreciated!

Regards,
Dylan