Determine if tracking event processor is caught up for multiple instances

In our application we use the processingStatus() on TrackingEventProcessor to get the index of a tracking token to compare this with the head token to be able to check if there are any token processors running behind.
This works fine if only one instance is running. However, when running two instances, only the instance that has claimed the token, will return a processingStatus.

In our setup, we have two instances of our application (configured with a loadbalancer). Our check goes via the loadbalancer to one of the 2 instances. If the token is claimed by instance 1, but the check runs on instance 2, we cannot get any information about the actual state of the processor, and thus cannot determine if the system is healthy.

For the moment, we filter out the processors without processing status, which causes false negative checks. Our check thinks everything is caught up (checking instance 2), but actually the token was running on instance 1 and was a lot behind.

Is there a way to check if a tracking processor is caught up, independent whether an instance has claimed the token?

We use AxonFramework 4.0.3.
Some of the tokens are configured on Axon Server version 4.1.4, other tokens are configured on a embedded jpa event store. All show the same behavior when requesting the processingStatus.

Our check:

private List<String> getDelayedProcessors(EventBus eventBus)
{
    if (!(eventBus instanceof EventStore))
        throw new IllegalArgumentException("EventBus is not EventStore!");

    T headToken = (T) ((EventStore) eventBus).createHeadToken();
    return headToken == null ?
            Collections.emptyList() :
            configuration.eventProcessors()
                    .values()
                    .stream()
                    .filter(processor -> processor instanceof TrackingEventProcessor)
                    .filter(processor -> tracksEvents((TrackingEventProcessor) processor, headToken.getClass()))
                    .filter(processor -> isNotWithinToleranceOfGlobalSequence((TrackingEventProcessor) processor, headToken, SystemProperty.TOKEN_PROCESSORS_HEALTH_TOLERATED_DELAY.getIntValue()))
                    .map(EventProcessor::getName)
                    .collect(Collectors.toList());

}

private <E extends TrackingToken> boolean tracksEvents(TrackingEventProcessor processor, Class<E> tClass)
{
    return processor.processingStatus()
            .values()
            .stream()
            .anyMatch(p -> tClass.isInstance(p.getTrackingToken()));
}

private boolean isNotWithinToleranceOfGlobalSequence(TrackingEventProcessor p, T headToken, int amountOfEventsToTolerate)
{
    return !p.isRunning() || !p.processingStatus().values().stream().map(EventTrackerStatus::getTrackingToken).allMatch(token -> coversWithTolerance(token, headToken, amountOfEventsToTolerate));
}

Hi Laura,

in version 4.2, we have added a position() method on the TrackingToken. This methods gives an indication of the position of a token within the stream. You can use the value from this method (a long) to compare it to the values provided by other tokens. For example, you can check the streams.headToken().position(), and compare it to the position of the tracking processors. For more the JPA/JDBC and AxonServer tokens, the difference describes the number of events to be processed to get to the head of the stream.

This feature has already been merged into the 4.2 branch. We’re currently finalizing the release, but can’t set a definitive date on it yet. Days / weeks, rather than weeks / months in any case.

For now, the only way to really check is, is exactly how you did it above.
Hope this helps.

Cheers,

Hi Allard,

I just updated from Axon 4.1.x to Axon 4.2.2 in production and we notice our health check status doesn’t work properly anymore.
It seems that some tracking processors are not always catching up to the “head” immediately.
Apparently this has no impact on the application so I guess this is behaviour is there with a good reason?

Do tokens only update in case the new events contain any useful event types for them?
Any idea what happens here?

Kind regards,

OK found the cause.

If an eventListener cannot handle a certain event type, Axon will put this type on a black list.

The effect of this blacklist is that the “eventStream.hasNextAvailable” will not return true if the following events are all on the blacklist. Therefore it will only extend the claim of the token but not update the token itself.

If the token is not updated, our health state will complain :frowning: == FALSE POSITIVE.

This is very important for us. Especially in the case where for some reason the event listener throws an exception and gets in “retry” modus.

Any idea how we can get any health state of our token processors?

Hi,

the reporting of the status of a processor has been improved in recent versions. You should be able to get more details on the state of the processor itself, as well as the threads it’s using.

If the information you require isn’t there, please let us know what you’re missing. If you want, you can also disable blacklisting using axon.axonserver.disableEventBlacklisting=true.

Cheers,