In our application we use the processingStatus() on TrackingEventProcessor to get the index of a tracking token to compare this with the head token to be able to check if there are any token processors running behind.
This works fine if only one instance is running. However, when running two instances, only the instance that has claimed the token, will return a processingStatus.
In our setup, we have two instances of our application (configured with a loadbalancer). Our check goes via the loadbalancer to one of the 2 instances. If the token is claimed by instance 1, but the check runs on instance 2, we cannot get any information about the actual state of the processor, and thus cannot determine if the system is healthy.
For the moment, we filter out the processors without processing status, which causes false negative checks. Our check thinks everything is caught up (checking instance 2), but actually the token was running on instance 1 and was a lot behind.
Is there a way to check if a tracking processor is caught up, independent whether an instance has claimed the token?
We use AxonFramework 4.0.3.
Some of the tokens are configured on Axon Server version 4.1.4, other tokens are configured on a embedded jpa event store. All show the same behavior when requesting the processingStatus.
Our check:
private List<String> getDelayedProcessors(EventBus eventBus)
{
if (!(eventBus instanceof EventStore))
throw new IllegalArgumentException("EventBus is not EventStore!");
T headToken = (T) ((EventStore) eventBus).createHeadToken();
return headToken == null ?
Collections.emptyList() :
configuration.eventProcessors()
.values()
.stream()
.filter(processor -> processor instanceof TrackingEventProcessor)
.filter(processor -> tracksEvents((TrackingEventProcessor) processor, headToken.getClass()))
.filter(processor -> isNotWithinToleranceOfGlobalSequence((TrackingEventProcessor) processor, headToken, SystemProperty.TOKEN_PROCESSORS_HEALTH_TOLERATED_DELAY.getIntValue()))
.map(EventProcessor::getName)
.collect(Collectors.toList());
}
private <E extends TrackingToken> boolean tracksEvents(TrackingEventProcessor processor, Class<E> tClass)
{
return processor.processingStatus()
.values()
.stream()
.anyMatch(p -> tClass.isInstance(p.getTrackingToken()));
}
private boolean isNotWithinToleranceOfGlobalSequence(TrackingEventProcessor p, T headToken, int amountOfEventsToTolerate)
{
return !p.isRunning() || !p.processingStatus().values().stream().map(EventTrackerStatus::getTrackingToken).allMatch(token -> coversWithTolerance(token, headToken, amountOfEventsToTolerate));
}