Detecting when a replay has finished in Axon 3.0

Hi,

Our application follows the below process in some cases:

  • disconnect from the internet (meaning no user can interact with the system)
  • rebuild a read model (i.e. delete that read model and then replay all the events related to to it)
  • when the replay is done, connect to the internet again (allow users back on the system)

I understand that we can use a tracking event processor to perform the replay, it seems simple enough. I can’t find a way to detect that a replay completed though, what am I missing?

Thanks!
Yoram

Hi Yoram,

tracking processors don’t distinguish between ‘live’ and ‘replay’ anymore. They just process all the events at their own pace. Depending on your definition of a ‘replay’, you could do one of two things:

  • consider the replay done when the timestamp of processed events come within an acceptable threshold. For example 1 second. In that case, the component could mark itself as ‘live’.
  • before triggering the ‘restart’, check the index of the current token. When processing events, switch to ‘live’ mode when the event with that token comes along. Note that you probably have to cast the TrackingToken to one of the actual implementations, as the token doesn’t expose its global index by default (not all implementations are global index based).

Hope this helps.
Cheers,

Allard

Hi Allard,

Thank you. Any chance you would consider adding the following method to the EventStorageEngine interface?

TrackingToken latestTrackingToken();

Since all the EventStorageEngine implementations already cast the Token to a specific implementation and contain the knowledge of starting an event stream from a certain point they are perfect spot to implement this functionality.

Obviously this would only make sense if you believe that there may be others using Axon 3.0 that will share the challenge described on this thread.

If you do see value in adding this functionality then I could, with you permission, create a pull request on github with the suggested changes.

Thanks,
Yoram

Hi Yoram,

yes, it’s a change I would consider for the 3.x branch. In fact, I would like to add 2 methods to the EventStore (and EventStorageEngine):

  • trackingToken(Instant instant) (–> would do a select min(globalIndex) where timestamp >= instant)
  • latestTrackingToken() (–> would do a select max(globalIndex) )

The first could be used to start a replay at another moment in time than “now” or “since the beginning”.

Cheers,

Allard

Hi Allard,

Great, thanks. Please allow me a few days to submit the pull request.

Cheers,
Yoram

Hi Allard & Yoram,

Was there any progress on this topic? I’m hitting the same requirement right now.

regards,

Michiel

No progress, yet.
The issue is not really a technical one (just yet). It’s rather a philosophical one, I’m afraid.

The Tracking Processor is a component reading from a Stream. It may continue from a token, or start from scratch. When do you consider that processor “replaying”, and when is it “live”?

One way to put it, is by saying: a processor is replaying until it reaches the head of the stream (which we can recognize by the fact that there aren’t any events ready for processing). Then is flips to “live”. But what if it starts lagging behind? Should it stay “live” or fall back to a “catching up” state?

Any ideas are welcome.
Cheers,

Allard

Hi Michiel and Allard,

I got swamped with other work and this got pushed down my priority list. I still intend to submit a pull request and offer the additions previously described so that it would be easier to replay and to detect when a replay is finished: In the general case as described by Allard a replay may never finish since the tracking processor may fall behind, but in our system when we replay to rebuild projections we do it “offline” during a downtime maintenance window. If the replay takes too long we could deploy the ‘new version’ side by side the ‘old version’, replay to a new database and then complete the replay and switch to the ‘new version’ with a very short downtime (which is acceptable in our SLA). One of the reasons we rely on offline replays is the paradigm offered by Axon 2.4.x, we might have done things differently if we would have started the project with Axon 3.

We have built a little Axon 2.4.x-compatible replay framework using tracking event processors, it made porting from Axon 2.4.x to Axon 3.0.x easier since we didn’t need to mix a paradigm change together with an intricate upgrade. If anyone’s interested I can describe what we did in a separate post.

Yoram

It’s nice to see the difference in semantics with axon 2 replays. We do replays until the max seq number for an aggregate taken at the time the replay starts, it works well for our use case.

Jorg

Maybe instead of thinking of it in terms of "the replay is finished," it'd be better to think of it in terms of, "we're all caught up on the event stream for now," which avoids the concept of being finished. Having a hook that gets called the first time there are no more unprocessed events in the event log (or even *every* time that's the case) seems like it'd do the trick.

In our application (still on Axon 2) we recently implemented background replays to support zero-downtime query model changes. When we need to, say, add a new column to a query-model table that can't easily be populated from other tables, we create a new version of the table with the new column, initializing it from the existing table when that makes sense. We add a new event listener to populate the new table. We start a replay while the application is running. While the replay is happening, the application continues to maintain the old version of the table and to query it in response to user requests.

When the migration finishes, we set a flag internally. That flag tells the event listeners to stop populating the old table, and tells the application code to switch over to the new one for queries. (We also, at that point, update any reporting views that pointed to the old table, so our business people see the new column only after it's fully populated.) We get rid of the code that accesses the old table and then we drop it from the database.

This has worked really well for us, but it hinges on being able to tell when the replays are caught up to real time, since that's when it's safe to switch over to the new table and stop maintaining the old one without losing data.

Absent an API for this in Axon 3, our event listeners can keep track of which events they've seen, but this does seem like something Axon should make easy to do; application developers like me are more likely to get it wrong in some subtle way if every one of us has to implement this logic from scratch in the application code.

-Steve

Hi Steven,

I like the concept of a callback when the head of a queue has been reached. At least it gives a good indication of whether the processor has caught up at some point in time or not.

I have also been experimenting with a method on the TrackingToken itself. By exposing a long value from the token, you can get a global indication of how far a processor is. A stream could also expose the tracking token of the HEAD of the stream (which also exposes this long). Larger difference between these values indicate a processor is further behind. Smaller values (or even 0) indicate the processor is up to date.
I’m currently looking at API implications when adding this. A few places are impacted, but so far it looks like something that can be retrofitted. For the Jpa/jdbc based tokens, the long value is the global index of an event. For MongoDB, it would be the timestamp of the event, as it doesn’t have a global index.

What do you think?

Cheers,

Allard

Sorry for not replying earlier (vacation & work got in the way).

Currently we have the following running in (limited) testing:

  • an extension to JdbcEventStorageEngine to get the latest (last?) tracked event
  • an extension to TrackingEventProcessor that uses that information to calculate its progress / whether it has caught up to the head of the stream
  • an API endpoint that reports the progress of all active TrackingEventProcessors

If anyone is interested, I can extract that code and put it on github.

cheers,

Michiel

Another late reply on this thread. On the concept of exposing the distance from the head of the stream on the tracking token: Would that require doing an extra query for each round of event fetching (query the next N events, and then also query the newest event)? Other than progress reporting, I can’t think of a compelling use for knowing how far away from the end of the stream a replay is; for purposes of taking action after a replay, you just need to know if you’ve reached the end yet.

If it can be done at zero or low cost, then that sounds like a nice addition; progress reporting is useful. An “I’ve caught up” callback would be more convenient for my use case, though.

-Steve

Hi Steve,

The way I’ve implemented it does require an extra query. I haven’t had the time yet to look at optimizing that.

Knowing how far a replay has to go is useful for us, if only to predict the amount of time before we can switch to a new version of a projection. A callback would be simple to implement.

regards,

Michiel

Hi Michiel,

I also have the problem that I need to know when a application has “caught up” on startup - the way I found how to check it is to get the token, open a eventstream and test whether there’s no new tokens.

This feels quite heavy - could you sketch out how you implemented the extra query?

Thank you very much,

Martin

Hi Martin, Michiel,

Just as a FYI, but we’re working on a solution to notify a ‘TrackingEventProcessor’ that a ‘replay’ has finished, or to be more precise to notify that it has caught up with the event stream.
The following issue will address this.

Cheers,

Steven

Hi Martin,

That’s roughly what I’m doing. I have something more or less ready to put on github, I’ll spend some time in the next few days to clean that up and share it.

This can of course be greatly simplified when the issue mentioned by Steven is resolved.

cheers,

Michiel

I had some spare time today and pushed some code here: https://github.com/mrook/axon-projection-rebuild-demo

I’ll try to remember to blog about it in the next few days :slight_smile:

cheers,

Michiel

Hi all,

interesting approach to add this feature. Beware, though, that this will no longer work in Axon 3.1, as there is no longer a single “token” per processor. Each thread will have its own. Therefore, the lastToken field has also moved to an inner class.

Instead of extending the TrackingEventProcessor, you could also create an implementation of the StreamableMessageSource which would wrap the actual source, such as the Event Store. This source could keep track of progress of the streams it opened. If each processor is given a different source instance, each source instance would be able to report process for each processor. I haven’t tried, but it seems that it would be less intrusive to Axon’s internals, which are subject to change.

Cheers,

Allard

Hi Allard,

Thanks for the heads-up! This is very much a proof of concept, so I sort of expected that :wink:

Wrapping the source sounds like a reasonable alternative, thanks. I’ll see if I have some time to change the POC in the near future.

Cheers,

Michiel