Finding out when all events have processed and take action afterwards

First and foremost, I want to express my gratitude for creating this exceptional framework. I have just begun exploring it and I’m already finding it enjoyable.

Regarding my current green field project, I am interested in initiating certain operations once the processing of all ongoing events has concluded, rather than just a single event. During my research, I came across the EventTrackerStatusChangeListener and I am seeking guidance on how to implement it. Currently, I have utilized spring boot properties for configuration purposes, where I declare processing groups and their corresponding parameters, such as initialSegmentCount. Is there a way to define the implementation of EventTrackerStatusChangeListener through configurations? Alternatively, if it can only be achieved through DSL (domain-specific language), will the processing group configurations introduced in the application properties be overridden? Thank you for your help.

TrackingEventProcessorConfiguration.forParallelProcessing()
.andEventTrackerStatusChangeListener()

1 Like

I tried setting listener programmatically as following:

    return configurer ->
        configurer
            .eventProcessing()
            .registerTrackingEventProcessorConfiguration(
                c ->
                    TrackingEventProcessorConfiguration.forParallelProcessing(threadCount)
                        .andEventTrackerStatusChangeListener(
                            new EventTrackerStatusChangeListenerImpl()));

If I have the processor configuration in yaml, above configuration gets over-written / not applied. And If I comment out processor config in yaml, only then EventTrackerStatusChangeListener gets applied. Is there way to use both dsl as well as yaml event processor configuration at the same-time? yaml based config is very handy and less-verbose.

Secondly, how can I get the name of the processor from EventTrackerStatus for which the status has changed?

public void onEventTrackerStatusChange(Map<Integer, EventTrackerStatus> updatedTrackerStatus) {

EventTrackerStatus has all information except name of processor.

Let me know if I am missing something here? Thank you for your help.

Firstly, let me quote something:

I want to thank you for your kind words here, @Deepak_Chaudhary.
I can tell you they mean a lot to us.

Secondly, concerning the EventTrackerStatusChangeListener, I do want to put small warning signs around it.

The EventTrackerStatusChangeListener was constructed (by me, actually) to solve a problem with running a single instance of an application in mind.
As soon as there are several instances running the same TrackingEventProcessor, you should know there is zero communication between them concerning the status changes.
This may cause undesirable and unclear scenarios within your application.
As such, I would recommend using the status change listener with caution and, preferably, not for to important business processes.

Instead of having a sort of “loop until point X in the stream is reached,” we prefer to recommend you make your application react when the time is right.
So, instead of expecting a specific event to be handled, you can publish the updates to your query models to those interested in them.

You can achieve this by, for example, using Axon Frameworks subscription queries.
This tool allows you to subscribe to a specific query model (group), and receive the changes as they occur.

However, whether my suggestions works depends on why you want to wait until a certain event has been handled.
So, would you mind sharing the use case you are facing, @Deepak_Chaudhary?
Perhaps there’s a straightforward solution to it.

Ow, and let me quote another section from your question to provide clarity:

Sadly, this is not an option at the moment.
My apologies for the inconvenience here.

Nonetheless, I hope the above helps you further, Deepak!

1 Like

Hi Steven,

I also have this requirement. Imagine the following: my projection receiving event stream and doing some cost intensive calculations on the results. If the application were down for some time, the tracking event processors will make sure to catch up with the event stream. There is a perfect way of detecting it from the EventTrackerStatus, since the caughtUp attribute tells me this.

At the end, I want to start the intensive recalculation, after the stream has been processed and not during catching up. My first optimization was to avoid re-calculations during replays, but for the catching up the same requirement holds.

I was also reasoning on tracking the “last” event to start recalculations, but could not find any meaningful way to do so…

Cheers,

Simon

Thanks for your two cents, @Simon_Zambrovski.

Note that my reply to @Deepak_Chaudhary is considered as the “9 out of 10”-solution.
There are, as always, outliers to are more easily solved with a built-in hook in Axon Framework.

However, as it stands, we cannot prioritize the effort on building such a feature or making the EventTrackerStatusChangeListener resilient in distributed environments.
This is not me telling you “we’ll not build this,” but rather me telling you “we know, we want to fix this, but we need to prioritize other topics first.”

With that said, I can assure all of you I would still prefer to have such a hook in place.
Hence, the future release of Axon Framework may just contain such a solution.

I hope by this to have clarified our stance on the subject further!

By the way, instead of applying global configuration as your example states, you can register a processor-specific configuration by EventProcessingConfigurer.registerTrackingEventProcessorConfiguration(processorName, trackingEventProcessorConfiguration)

So in your example:

return configurer ->
        configurer
            .eventProcessing()
            .registerTrackingEventProcessorConfiguration("myProcessor", 
                c ->
                    TrackingEventProcessorConfiguration.forParallelProcessing(threadCount)
                        .andEventTrackerStatusChangeListener(
                            new EventTrackerStatusChangeListenerImpl("myProcessor")));

Of course you have to change your implementation EventTrackerStatusChangeListenerImpl and provide the name of the processor as a constructor parameter.

Maybe this helps,

Cheers

Simon