How to ensure that the event handling components will receive and process events in the order they have been persisted in the eventstore

I have a requirement where multiple events are getting fired from different sections of the application through eventgateway.publish(event)and getting persisted in the eventstore and all of the event handlers for the above mentioned events are there in the same java package so all of them will be in the same TEP as far as I understand all the event handling component will receive events in the order persisted in the event store so this should ensure that whichever event got persisted first will have their event handler getting them first. Is this understanding correct?
second question is if we have 2 segment and 2 threads per TEP instance is there a guarantee that the event that got persisted first will complete processing first and then only the next event and if there is no such guarantee from axon then can we just have 1 segment and 1 thread per TEP instance and will this ensure that the event that got persisted first will complete event processing first and then only the next event will be processed. Please note that we are using default axon stream processing size of 1.

1 Like

I think I get the gist from your question @Ashwini_Kumar, but I would like to ask you to edit the question to separate it out a little more in manageable chunks. This isn’t only beneficial for me to be certain I am answering your question correctly, but also for others who might be facing the same issue you are facing right now.

Nonetheless, let me give it a try and me parse out the first question in your text:

…as far as I understand all the event handling component will receive events in the order persisted in the event store so this should ensure that whichever event got persisted first will have their event handler getting them first. Is this understanding correct?

This understanding is correct. The insert order of your events is maintained as the order in which they are handled. There is, however, a notion of parallelizing the event handling load which has some impact. Luckily you are moving into this point with your following question:

…if we have 2 segment and 2 threads per TEP instance is there a guarantee that the event that got persisted first will complete processing first and then only the next event…

Having 2 segments in essence means you have split your event stream in two. You however have the power to decide how the events are “sequenced” to each of these portions of the stream. Axon does this through means of the SequencingPolicy, which is invoked every time prior to entering your Event Handling functions and used to check whether the value matches the segment that thread is controlling.

If it does not match, the event will simply be disregarded by that exact segment. If it does match, it will, as you might have guessed, be handled by that segment.

By default the SequencingPolicy is configured as the SequentialPerAggregatePolicy. This implementation will use the aggregate identifier to decide if an event belongs to a given segment yes/no. Doing so, ensure that all events originating from the same Aggregate instance (as defined through the aggregate identifier) will be handled in the order they have been inserted in your event store.

What you point out is that you are using the EventGateway#publish method, however. Note that when using this approach to publishing events, you are not publishing so called “domain events”. It is the domain event that contains an aggregate identifier (since it is published from inside your domain model). As such, the default SequentialPerAggregatePolicy has not aggregate identifier to guarantee this ordering through. This means that by using this means of publishing in conjunction with the configured SequencingPolicy, the event handling order between two (or more) different segments on a TrackingEventProcessor is unclear.

You can, however, provide your own implementation of the SequencingPolicy, which uses a different value to deduce in what segment an event should land up. Whether there is such a value in your domain, is hard to deduce for me from the outside. You could think about a correlation/trace/session identifier in the meta data of the EventMessage which is identical among the events which should be handled in order. Or perhaps you have a domain-specific identifier (like a user id) that can be used to this end. Whichever it is, you will need to provide a custom SequencingPolicy which takes this other value into account to deduce which segment is in control of what event.

Lastly, you can indeed simply use a single segment for the TrackingEventProcessor too. This effectively means all events belong in the same portion of the event stream, as it is comprised of everything. Note that the thread count does not matter in this case, as a TrackingEventProcessor can only process events if it has a claim on a segment. Thus with one segment, only a single thread would ever perform any work.

Hoping to have clarified your options sufficiently, @Ashwini_Kumar.

Thank you steven for a detailed explanation. Using a custom sequencing policy would be difficult as we are expecting all events for that tep to be handled in order.
As you have pointed out I am thinking to have just one segment for this tep so that everything is ordered. My only concern is right now for this tep we already have 2 segments which means two rows in tokenentry for this tep with segment as 0 and 1, now if I delete one of them it can lead to some events getting processed twice or some events getting missed. Is there is a way to manage this and is this the only issue that I can potentially face or there can be any other issues as well.

Hey @Steven_van_Beelen would you be able to help on the above question

If the entire stream needs to be in order, then processing all events with one segment would indeed be your only solution. There are, roughly speaking, three options you have.
Let me share those:

  1. You can change the SequencingPolicy from the default SequentialPerAggregatePolicy to SequentialPolicy. This works, but would still mean you have two segments, of which one doesn’t do a thing.
  2. Stop your application(s), clear out the token and construct a new entry yourself with the right segment id. This would require some insight in how to insert such an entry though, as well as that both your segments should be at the exact same position for this to work. Otherwise, you’d need a token that can point to two positions in the stream.
  3. You should merge the segments of your TrackingEventProcessor into one another. This is what Axon Framework has the split and merge operation for, with the following documentation. Quite recently some people at AxonIQ have written a detailed blog about it too, which you can find here.

Although option 1 works, I feel option 2 to be the cleaner approach. Option two will essentially clear out the unnecessary token entirely, thus minimizing the resources used by said TrackingEventProcessor. To invoke the merge operation, you should thus first get hold of the TrackingEventProcessor you need to merge for.

This can be achieved through the EventProcessingConfiguration#eventProcessor(String, Class<?>) method. The String should contain your TEP’s name, and the Class<?> should be TrackingEventProcessor.class. Doing this, you get an Optional<TrackingEventProcessor>, and thus opening up the TrackingEventProcessor#mergeSegment(int) method. Invoking this method with segmentId 0 should do the trick since you only have two segments that take up segmentId 0 and `.

When invoking the mergeSegment operation, you should be aware that the TEP this operation is performed on is required to hold a claim on both segments which you’re merging. If it doesn’t hold the claim or cannot claim either of them, the operation will fail. Thus, the easiest way to ensure this is to only have a single of these TEPs instances running, also in a distributed environment. Doing so will make sure the only running TEP has both claims and as such it can be the place where you’d invoke the mergeSegment() method.

By the way, all this can be done out of the box with Axon Server, as it would know which TEP to invoke the operation on for you. It similarly allows the splitSegment method, in essence giving you the option to scale the number of parallel processes of your event processor as the need arises. As I believe you are not using Axon Server, however, you will have to go through this process of constructing an (end)point through which to invoke this operation on a running instance.

Hope this clarifies your options @Ashwini_Kumar.

explaining my current architecture below :-
currently we have two instances of this application running in pcf and the concerned TEP has 2 segments so basically two different instances have locked these two segments and I cannot make this application run on just one instance as the pcf team does not allow to do that so I am not sure if the approach mentioned by you will work here or not. I can create a rest endpoint and pass segment as 0 through a query param but the only issue is I dont know which instance will pick this up and do the merging.
I was thinking can’t I do a hard delete from the table directly for segment 1 for this TEP, before doing this I will ensure the apps are stopped are all corresponding events for this TEP are processed. Please let me know if this approach will work or if the above approach mentioned by you will work in my scenario. Hoping to hear from you soon @Steven_van_Beelen

Sorry for coming back later than usual; sadly my schedule was a little too busy to also dive into these specifics. Without further ado, let’s move to your scenario description.

The fact you need to run the instance twice (which I think is a good thing by the way) shouldn’t be an issue for the number of segments you have.
The TrackingEventProcessor can simply run on both, but just one of them would keep the claim on the single segment you’d require.

What I meant by stopping the TEP, is that you can invoke the shutDown() method on one of the TEP instances (sorry for the ambiguity in this area).
Doing so will not stop your entire application (which I understand you’re not allowed to do), but it will stop that single TEP from running for the time being.

When you stop a TEP, it will release the claim on any tokens it has.
It is this process that ensures that the single still running TEP can claim all the segments.
With all the segment claims in hand, you can invoke the merge operation as described earlier.

As stated earlier, Axon Server provides a merge and split button on the dashboard. It just as well provides a start and stops button to deal with all your TEP instances at once. So, that would simplify the process you need to follow at the moment.

I hope the above clarifies my description a little further, hopefully giving you sufficient to make this work
At any point, you were wondering something else too:

I was thinking can’t I do a hard delete from the table directly for segment 1 for this TEP, before doing this I will ensure the apps are stopped are all corresponding events for this TEP are processed.

Sure, this would also be a workable solution.
This would require you to stop both application though (which I assumed from your description wasn’t allowed?). Added, you would then clear the token from the database, but you’d also have to do some configuration on your end.

Configuration wise, you will need to set the initialSegmentCount for this TEP instance to 1.
Furthermore, you are inclined to put the initialTrackingToken to the head of the stream. Setting a token to the head is best achieved with the StreamableMessageSource#createHeadToken operation.
The easiest way to achieve this is by providing a TrackingEventProcessorConfiguration for this TEP instance which set’s both the fields as desired. Note that the StreamableMessageSource is given to you in the Function you’d set on the TrackingEventProcessorConfiguration#andInitialTrackingToken method.

The one downside to this approach is that you need to make sure all events are handled for this TEP.
In a live application, this means you will have to stop all command processing, and other ways of event distribution inside your system. If you don’t do this, you will by definition have a moving target of what the head of the event stream is. More simply put, the chances of you missing an event if you change the token to the head of the stream are not negligible if the application is still producing events.

Hoping this helps you further @Ashwini_Kumar!

To answer this question(Sure, this would also be a workable solution.
This would require you to stop both application though (which I assumed from your description wasn’t)) I can stop both instances of my application during deployment window and when I do stop both instances then can i do a hard delete for segment 1 row for this tep after ensuring that all events for this tep has been processed and when I do this do I still need to set the initial tracking token to the head of the stream and Initial segment to 1? As per my understanding after deleting the row for segment 1 in tokenentry table when I restart my app the initial segment count should be 1 only and if no new events are generated then initial tracking token should be pointed to the head of the stream, let me know if my understanding is correct.
Hoping to hear from you soon @Steven_van_Beelen

Sadly enough, it is not that simple.
Let’s assume we have an event stream of 1000 events in total.
Furthermore, you have a Tracking Event Processor with two segments, each claimed by a distinct thread (doesn’t matter whether this is on one of two application instances).

So, there are two threads processing work. Entirely on their own, without any notion of the other one.
On top of this, the segments portray a portion of the stream. Simply removing one and adjusting the configured value doesn’t change the fact that the given segment reflects just a portion of the stream.

Hence, the only way would be to clear out both tokens in this case.
Now, this however brings another problem around the corner.

Let us further assume that segment one is at position 500 of the stream and segment two at position 750 in the stream. If you would take your suggested approach, you would thus need to instantiate a new token which points to both position 500 and 750. If you would naively select 750 as this is furthest on the stream, you would simply miss about 125 events (half of 250, as there are two segments). On top of this, 125 is a massive assumption on how the distribution is done with your SequencingPolicy, if which there are simply no guarantees.

If you would instead select position 500, you are insured all events are handled. However, this means (125) events will be handled twice, with differing outcomes depending on how your event handler is implemented. Again, this is likely not a road you’d want to take.

So to make this approach succeed, you will have to be certain that both applications have reached the exact same point in the event stream. To make sure they can though, you will have to shut down *any processes which publish events and keep your TEP running.

Any how, the approach you’re hoping for has definite “problem scenarios” in play, whereas using Axon’s Merge functionality will ensure no events are missed. It does so by combining two tokens into a so-called MergedTrackingToken. It’s this token implementation that knows how to delegate a given TEP thread to either retrieve from the lower, or the higher position in the stream.

I can create a rest controller to stop one of my TEP instances but the only challenge that I have here is as my apps are running on two instances when I hit this rest endpoint I am not sure which TEP instances(segment 0 or segment 1 will be stopped) as it will depend completely on which instance picks it up and try to process it. Ideally TEP instance with segment 1 should be stopped and like I said my app is running on two instances in PCF and both instances are identical so I cannot have guarantee that TEP instance with segment 1 will be stopped and after running this if TEP with segment 0 is stopped then it can lead to event processors not getting triggered at all.
In short can you help as to how can I ensure in my scenario that TEP with segment 1 is stopped and not with segment 0. In my cloud environment I cannot reduce the instances from 2 to 1.

I’d wager this REST endpoint would be an operator-specific tool to use.
In essence, it provides an operator of your application extra handles when it comes to controlling your applications. The TrackingEventProcessor instances simply belong in that order.

Now, I understand you have a load balancer in this process. Although the load balancer makes sense from a user perspective, I feel that the operator should be allowed to hit the exact endpoints of each of those applications. He/she should have higher credentials to access the app, so knowing the exact URL sounds fair to me too.

Otherwise, you could construct another application that takes the role of “knowing” your Axon applications, so that it can delegate the start, stop, split and merge operations correctly. This is actually what Axon Server’s role is; a system-in-the-middle that allows you to deal with your applications.

Anyhow, I’ve taken the practice of a type of ManagementEndpoint or OperatorEndpoint, which provides operator-specific operations like split/merge/start/stop of the TEP. Furthermore, in my experience, the operator was allowed to hit the exact locations instead of being totally in the dark where they are. Hope this further clarifies the suggested approach @Ashwini_Kumar!

I found a common identifier across my 3 different events and created a custom sequencing policy and defined these events in that but what I am seeing is still for same common identifier the events are going to different segments rather than going in the same segment, Would you be able to shed some light on this @Steven_van_Beelen

Would you be able to share the SequencingPolicy implementation you’ve constructed?
In essence, using the SequentialPolicy (as found here) would get the job done too.

I thought I’d already shared that, but I am beginning to doubt that given your request.
Hoping this helps you further @Ashwini_Kumar!