Know when Axon has finished processing during integration test

Our team is developing Cucumber (integration) tests using the Axon framework. We would like to make the tests as predictable as possible, so we would like to check whether Axon has finished processing before we proceed to the next test step.

Is there a way to check whether there are no more pending commands and events?

Hey Ruben,

we’ve solved the same challenge for integration testing by implementing a TestEventRecorder (a regular TrackingEventProcessor) that is capable of recording all published events and makes sure that all other TrackingEventProcessors (namely the ones in the production code) have processed the events for a certain test case published up to a certain point in time.

To be sure that all TrackingEventProcessors have finished processing the desired events, we

  1. set the TestEventRecorder to “record” state at the beginning of an integration test case and add a testCorrelationId to identify which events belong to the current test case,
  2. execute the action that triggers producing the desired events in the integration test
  3. let the TestEventRecorder record all events having this testCorrelationId until “stop” is called for a certain correlation id
  4. on “stop” we catchUp all other event processors by sending a RecordingCatchupEvent directly to the EventBus and determine the TrackingToken related to this event when it is processed in the TestEventRecorder - then we can use the axonConfig to get all other tracking event processors and simply block the stop method to wait until all production event processors have processsed up to this TrackingToken (see example code below)
  5. as a bonus, we can also return all events that have been recorded for a certain correlationId and use the to assert the expected events in the integration test (for our integration tests we send in a command and assert that the desired events we expect to be produced are actually published)

Since publishing and processing of events is happening at different times, we additionally implemented a TestEventCorrelationIterceptor (which is a regular axon MessageDispatchInterceptor) that adds a testCorrelationId to the event metadata when publishing and event. Thus we make sure that the TestEventRecorder separates the recorded events per correlationId, even if the processing of in the TestEventRecorder happens after we already signaled the recorder to stop recording from within the integration test.

Example code for catching up other processors:

`
private void catchUpTrackingProcessors(TrackingEventProcessor recordingProcessor) {
final TrackingToken recordingToken =
recordingProcessor
.processingStatus()
.values()
.stream()
.map(EventTrackerStatus::getTrackingToken)
.filter(Objects::nonNull)
.reduce((tokenA, tokenB) -> tokenA.covers(tokenB) ? tokenA : tokenB)
.orElse(null);

if (recordingToken == null) {
log.debug(“trackingToken for event recorder not found”);
throw new RuntimeException(“failed to catch up”);
}

log.debug(“processors need to catch up to {}”, recordingToken);

// wait for all other processors to catch up
final Collection otherProcessors = getOtherTrackingProcessors();
log.debug(“iterating {} processors to catch up”, otherProcessors.size());
for (TrackingEventProcessor processor : otherProcessors) {
if (!processor.isRunning()) {
log.debug(“skipping processor {} since it is already shutting down”, processor.toString());
continue;
}

final Set currentSegments = processor.processingStatus().keySet();
for (Integer segmentId : currentSegments) {
EventTrackerStatus currentStatus = processor.processingStatus().get(segmentId);
if (currentStatus == null) {
log.debug(“segment {} of processor {} vanished meanwhile.”, segmentId, processor.toString());
continue;
}

TrackingToken currentTrackingToken = currentStatus.getTrackingToken();
log.debug(“segment {} of processor {} {} to token {}. recording token is {}”,
segmentId, processor.toString(), currentStatus.isCaughtUp() ? “is caught up” : “is not caught” +
" up"
, currentTrackingToken, recordingToken);
if (currentTrackingToken != null && currentTrackingToken.equals(recordingToken) && currentStatus.isCaughtUp()) {
// continue with next segment
continue;
}

// run an exponential backoff of max. 5 secs to wait for finishing
final ExponentialBackOff exponentialBackOff = new ExponentialBackOff(INITIAL_BACKOFF_MS, 1.5);
exponentialBackOff.setMaxElapsedTime(MAX_BACKOFF_MS);
final BackOffExecution backOff = exponentialBackOff.start();
do {
final long currentInterval = backOff.nextBackOff();
if (currentInterval == BackOffExecution.STOP) {
log.debug(“backoff expired waiting for segment {} of processor {} to catch up to {}.”,
segmentId, processor.toString(), recordingToken);
throw new RuntimeException(“failed to catch up”);
}
log.debug(“segment {} of processor {} needs to catch up to {} from {}. Waiting for {}ms …”,
segmentId, processor.toString(), recordingToken, currentTrackingToken, currentInterval);

synchronized (processor) {
try {
processor.wait(currentInterval);
// do live refresh of current Status and current Tracking token
currentStatus = processor.processingStatus().get(segmentId);
currentTrackingToken =
Optional.ofNullable(currentStatus)
.map(EventTrackerStatus::getTrackingToken)
.orElse(null);
} catch (InterruptedException e) {
// re-set the interrupted state on the current thread, since we don’t expect to be
// waked up
Thread.currentThread().interrupt();
}
}
} while (currentTrackingToken == null ||
(!currentTrackingToken.equals(recordingToken)) && !currentStatus.isCaughtUp());
log.debug(“segment {} of processor {} is caught up to the catchup token {}.”,
segmentId, processor.toString(), recordingToken);
// continue with next segment
}
}
}
`

Example Code for the TestEventCorrelationInterceptor:

`
/**

  • simple message interceptor that serializes the current {@link #currentTestCorrelationId} to the
  • {@link Message#withMetaData(Map)} prior to message dispatching
    */
    public class TestEventCorrelationInterceptor implements MessageDispatchInterceptor<Message<?>> {

private static final Logger log = LoggerFactory.getLogger(TestEventCorrelationInterceptor.class);
public static final String EVENT_RECORDER_METADATA_KEY = “ai.ikosa.portal.commander.util” +
“.TestEventsRecorderInterceptor”;
public static final String UNSET_CORRELATION_ID = “unset”;

public String currentTestCorrelationId = UNSET_CORRELATION_ID;
public boolean unsetOnNextCatchup = false;

public void setCurrentTestCorrelationId(String currentTestCorrelationId) {
this.currentTestCorrelationId = currentTestCorrelationId;
unsetOnNextCatchup = false;
}

public void unsetOnNextCatchup() {
unsetOnNextCatchup = true;
}

public TestEventCorrelationInterceptor() {
super();
}

/**

  • Handles a list of messages and serializes the {@link #currentTestCorrelationId} to the
  • Messages metadata before dispatching them, to correlate recorded messages together
  • see {@link MessageDispatchInterceptor#handle(List)}

I’ve also filed an issue in the AxonFramework you can follow: https://github.com/AxonFramework/AxonFramework/issues/1426

Thanks for the explanation, this looks like a good solution. We built our own variant which uses Awaitility:

`

package ...;

import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingToken;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.List;

import static java.time.Duration.ofMillis;
import static java.util.stream.Collectors.toList;
import static org.awaitility.Awaitility.await;
import static org.awaitility.pollinterval.IterativePollInterval.iterative;

/**
 * Utility for waiting until all Axon tracking event processors caught up with the same tracking token.
 * <p>
 *   The goal is that every integration test step that mutates the system state should also wait
 *   for all processors to catch up with the latest axon tracking token.

I also need to think about whether this is actually correct. The code checks whether all event processors are at the same point. However, this could might not be correct if there are 0 or 1 event processors, or if that ‘same point’ is not actually the final point.

Hi Ruben,

in addition to the code above, you could check the position of the head token of the Event Store (use createHeadToken().position()) and validate that your processors are in the same position. However, this will only work if your processors only track the event store. If you track another source or multiple sources, the positions will differ.

The way I address this is by embracing the async behavior of a system, instead of trying to add synchronization points.
You expect a value to be present, but since it is an asynchronous system, it is not present immediately. Your test should define a deadline for this to evaluate to true.
In the tests of Axon itself, we use this code:

public static void assertWithin(int time, TimeUnit unit, Runnable assertion) {
long deadline = System.currentTimeMillis() + unit.toMillis(time);
while (true) {
try {
assertion.run();
break;
} catch (AssertionError e) {
Thread.yield();
if (System.currentTimeMillis() >= deadline) {
throw e;
}
}
}
}

We use yield() here because the values are expected to become true very quickly. If you want to take it slower, you can use Thread.sleep() instead.
In a past project, we have used this same approach for Selenium tests. This is generally faster than just waiting for the deadline and checking the code.

To make sure commands are all executed, you can verify the state of the Completable that is returned.

Hope this helps.
Cheers.

Hi Allard,

the pattern you are suggesting is actually what we do outside the TestEventRecorder - in our Tests we set a deadline until which we expect a certain list of events to be processed and returned from the recorder. The additional synchronization logic is more or less convenience to make sure that all of the TEPs we have running on our write side actually have processed up to a certain “save-point” as expected and we can assert other state changes in our command models or external calls done by these TEPs without the need for extra deadlines.

Hey Ruben,

there is actually a part of the code I left out, since we recently did some refactoring there. Before catching up the other TEPs, we send through a custom event, which will be the last event hitting all TEPs - as soon as this arrived in the TestEventRecorder and is processed, we know that the TestEventRecorder has actually processed all events published prior to this catchup event. With this logic we can be sure that when calling stopRecording / catchUp all events that have been published prior to this stopRecoring / catchUp statement in the integration test will be processed by the TestEventRecorder before it returns. This means when you use sendAndWait(…) or process the Completeable returned by send(…) for your commands as suggested by Allard, you can be sure that all events are actually published - and calling stopRecording / catchUp after this means that the TestEventRecorder will process all of these events.

`

// TODO implement improvement use EventStorageEngine and eventStore and only track tokens without actually
// storing and processing events here
public void catchUp(String correlationId) {
log.info(“catching up event processors for correlationId {}”, correlationId);
// pipe a recording catchup event and wait for it
final RecordingCatchupEvent recordingCatchupEvent =
new RecordingCatchupEvent(UUID.randomUUID(), Instant.now());
this.eventBus.publish(
GenericEventMessage.asEventMessage(recordingCatchupEvent)
);

log.debug(“RecordingCatchupEvent published”);
synchronized (recordingCatchupEvents) {
// wait until recording catchup event is processed here
ExponentialBackOff exponentialBackOff = new ExponentialBackOff(INITIAL_BACKOFF_MS, 1.5);
exponentialBackOff.setMaxElapsedTime(MAX_BACKOFF_MS);
final BackOffExecution backoff = exponentialBackOff.start();
while (!recordingCatchupEvents.getOrDefault(correlationId, List.of()).contains(recordingCatchupEvent)) {
final long currentWait = backoff.nextBackOff();
log.debug(“processor {} needs to catch up to process catchup event. Waiting for {}ms …”,
PROCESSOR, currentWait);
if (currentWait == BackOffExecution.STOP) {
log.debug(“backoff expired waiting for processor {} to process catchup event.”,
PROCESSOR);
throw new RuntimeException(“failed to catch up”);
}
try {
recordingCatchupEvents.wait(currentWait);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// remove recording catchup event from publishedEvents
recordingCatchupEvents.get(correlationId).remove(recordingCatchupEvent);
}

// get latest TrackingToken for recording processor
catchUpTrackingProcessors(getRecordingProcessor());
}

`

Hope that makes it a bit clearer,

Best Regards