resetting (replaying) multiple trackingProcessors

I changed from our custom replay (via repo-> deleteToken) to the 3.2 api.

Our appication solely relies on the eventstore and does a full replay of the in-memory model-state on every restart.

I use the following shutdown->reset->start cycle in a spring smartLifecycle (kotlin syntax, “it” refers to the trackingProcessor in loop)

with(eventHandlingConfiguration.trackingEventProcessors()) {
  // note: this has to be done in 3 iterations, because we have eventProcessors that
  // listen to the same event types and they would miss events.
  forEach {
    it.shutDown()
  }
  forEach {
    it.resetTokens()
  }
  forEach {
    it.start()
    logger.debug { "Replayed ${it.name}" }
  }
}

where the trackingProcessors is an extension function that filters for instance of TrackingProcessor and casts accordingly.

I found that shutdown-reset-start in one loop does not work, because our tracking processors listens to shared events and get confused when on processor is already in replay while the others are being reset.

The above code generally works in prod, but during i-tests (SpringRunner/SpringBootTest, I got

2018-04-03 10:28:43.133 WARN 30022 — [rLeaderBoard]-1] o.a.e.TrackingEventProcessor : Error occurred. Starting retry mode.
java.lang.IllegalArgumentException: Incompatible token type provided.

I assume this is related to the asynchronous nature of the shutdown/reset cycle and the single thread test runner with spring Boot …

Question: what is the correct way to replay multiple trackingProcessors on application start and make sure they all are up and running to access the replayed events once?

Or would it make sense two create the model with one trackingProcessor and let the dedicated views query their results vie queryGateway?

Thanks a lot

Jan

Hi Jan,

if you update a in-memory models, then I would recommend to also use an In-Memory Token Store for these processors. This would completely remove the need to start a reset at every restart. Having the tokens in-memory would guarantee that the processor restarts when the model has been cleared as well. I don’t think resets of these processors should be necessary.

Regarding the “Incompatible token provided”, an issue has been filed that a reset while the processor was still replaying, or a reset when its token is null, leads to this error. We will work on a fix in 3.2.1.

Cheers,

Allard

Thanks Allard, using the in-memory tokenStore really helped simplifying the application …

as for the other issue: I guess it could make sense to let the shutdown/reset/start methods return futures so you can programmatically block until everything is working again …

Hi Allard, I got one follow-up question: now, with the in memory token store I get the automatic replay on application start. Is there a chance I can use a hook to detect when the initial replay is done? I have some optional further initialization steps that currently I am doing in another smartLIfecycle and its interfering with the replay …

Hi Jan,

yes, you can check the status of a tracking processor. It contains a flag indicating whether the processor has caught up or not. This flag starts being “false” and is flipped to “true” when the processor reached the head of the event stream. Note that it won’t go to false anymore, even when the processor falls behind. Only stopping the processor and restarting it would reset this flag to false.

Returning CompletableFutures for the lifecycle methods of the processor is actually a good idea. I will look into it.

Hope this helps.
Cheers,

Allard

Sorry to bother you again, but I am having troubles understanding the state flag …

It looks like the only accessible field „state“ is private final AtomicReference state = new AtomicReference<>(State.NOT_STARTED);

which does not show the required behavior. What you seem to refer to is the TrackerStatus#caughtUp flag, but I fail to see how I would access it.

My source of input is the EventhandlingConfiguration (configured via @ProcessingGroup and application.yaml axon.eventhandling.processors.NAME.mode: tracking) which I filter and cast to get a List …
What I now need is some periodic check on the caughtUp flag of all processors in the list and continue once all are true.

ideas?

Hi,

it’s the processingStatus() method, added in 3.2. It provides a map with information for each Segment (thread) being processed. The values of the map are the status reports for each of these threads. Note that it might be possible that a single thread has caught up, but another one hasn’t.

Hope this helps.
Cheers,

Allard