Long running tasks in TEP EventHandlers

Hello Experts!

We have a use case where one of our TEP, in response to single event, depending on the number of clients, might have signifficantly more work to do than other TEPs in the system.

Due to too short claim timeout, before the TEP completed processing, some other node has stolen an ownership over a segment, causing the processor to fail when commiting the processing. Very soon we realized that all TEP were doing exaclty the same job, and none of them was able to update a tracking token at the end of processing. Default 10 seconds of token claim timeout wasn’t sufficient and we extended it to 30 seconds for now, knowing that this won’t be enough shortly.

The claim timeout is set on a TokenStore, globally for all TEPs in the service. Drawback is that even for “the fastest” TEPs, in case of failure, another node will hand over the job after exiration of the timout valid for “the slowest” TEP.

We are looking for a way to extend token claim during processing of an event, so the claimTimeout can be kept relatively short and we can ensure the ownership of the segment won’t change until the processor completes the processing.

Does it sound like a valid approach? How can we achieve this in Axon? Maybe you have some other recommndations on how to deal with this kind of use cases.

Thanks,
Tomasz

The first thing that makes me curious is how come event handling takes over 30 seconds?
Is this due to a high configured batch size? Or is the event handling operation simply taking so long?

With that said, let me react to some of your sentences:

The claim timeout is set on a TokenStore, globally for all TEPs in the service.

By default, this is indeed the case. The EventProcessingConfigurer, however allows you to configure a TokenStore per StreamingEventProcessor instance. You would use the EventProcessingConfigurer#registerTokenStore(String, Function<Configuration, TokenStore>) for this, by the way.

We are looking for a way to extend token claim during processing of an event, so the claimTimeout can be kept relatively short and we can ensure the ownership of the segment won’t change until the processor completes the processing.

I do not think this is straightforward right now. You wouldn’t use the TrackingToken but the segmentId paired with the TrackingToken to extend the claim. You can see this on the TokenStore#extendClaim(String processorName, int segment) method. Furthermore, although the segmentId is stored, it’s not shared in your event handler. So although you can wire the TrackingToken (to spot the event’s position, for example), getting the segment identifier out of it is rather tricky.

Having said that, I’d rather recommend the following.
First, I would try to isolate the Event Handling Component from the long-running event handling task. You can configure a dedicated StreamingEventProcessor (both the Tracking and Pooled Streaming Event Processor should be fine here) for this specific component. Further, you can configure a specific TokenStore for this single StreamingEventProcessor, which has the extended claim timeout.

As a side note, I would like to share that we are considering moving the extended claim timeout from the TokenStore to the Streaming Processor. However, as far as I can tell right now, that’s a breaking change. As such, it will not be introduced in Axon 4.x, but in Axon 5.