Hey Ruben,
we’ve solved the same challenge for integration testing by implementing a TestEventRecorder (a regular TrackingEventProcessor) that is capable of recording all published events and makes sure that all other TrackingEventProcessors (namely the ones in the production code) have processed the events for a certain test case published up to a certain point in time.
To be sure that all TrackingEventProcessors have finished processing the desired events, we
- set the TestEventRecorder to “record” state at the beginning of an integration test case and add a testCorrelationId to identify which events belong to the current test case,
- execute the action that triggers producing the desired events in the integration test
- let the TestEventRecorder record all events having this testCorrelationId until “stop” is called for a certain correlation id
- on “stop” we catchUp all other event processors by sending a RecordingCatchupEvent directly to the EventBus and determine the TrackingToken related to this event when it is processed in the TestEventRecorder - then we can use the axonConfig to get all other tracking event processors and simply block the stop method to wait until all production event processors have processsed up to this TrackingToken (see example code below)
- as a bonus, we can also return all events that have been recorded for a certain correlationId and use the to assert the expected events in the integration test (for our integration tests we send in a command and assert that the desired events we expect to be produced are actually published)
Since publishing and processing of events is happening at different times, we additionally implemented a TestEventCorrelationIterceptor (which is a regular axon MessageDispatchInterceptor) that adds a testCorrelationId to the event metadata when publishing and event. Thus we make sure that the TestEventRecorder separates the recorded events per correlationId, even if the processing of in the TestEventRecorder happens after we already signaled the recorder to stop recording from within the integration test.
Example code for catching up other processors:
`
private void catchUpTrackingProcessors(TrackingEventProcessor recordingProcessor) {
final TrackingToken recordingToken =
recordingProcessor
.processingStatus()
.values()
.stream()
.map(EventTrackerStatus::getTrackingToken)
.filter(Objects::nonNull)
.reduce((tokenA, tokenB) -> tokenA.covers(tokenB) ? tokenA : tokenB)
.orElse(null);
if (recordingToken == null) {
log.debug(“trackingToken for event recorder not found”);
throw new RuntimeException(“failed to catch up”);
}
log.debug(“processors need to catch up to {}”, recordingToken);
// wait for all other processors to catch up
final Collection otherProcessors = getOtherTrackingProcessors();
log.debug(“iterating {} processors to catch up”, otherProcessors.size());
for (TrackingEventProcessor processor : otherProcessors) {
if (!processor.isRunning()) {
log.debug(“skipping processor {} since it is already shutting down”, processor.toString());
continue;
}
final Set currentSegments = processor.processingStatus().keySet();
for (Integer segmentId : currentSegments) {
EventTrackerStatus currentStatus = processor.processingStatus().get(segmentId);
if (currentStatus == null) {
log.debug(“segment {} of processor {} vanished meanwhile.”, segmentId, processor.toString());
continue;
}
TrackingToken currentTrackingToken = currentStatus.getTrackingToken();
log.debug(“segment {} of processor {} {} to token {}. recording token is {}”,
segmentId, processor.toString(), currentStatus.isCaughtUp() ? “is caught up” : “is not caught” +
" up"
, currentTrackingToken, recordingToken);
if (currentTrackingToken != null && currentTrackingToken.equals(recordingToken) && currentStatus.isCaughtUp()) {
// continue with next segment
continue;
}
// run an exponential backoff of max. 5 secs to wait for finishing
final ExponentialBackOff exponentialBackOff = new ExponentialBackOff(INITIAL_BACKOFF_MS, 1.5);
exponentialBackOff.setMaxElapsedTime(MAX_BACKOFF_MS);
final BackOffExecution backOff = exponentialBackOff.start();
do {
final long currentInterval = backOff.nextBackOff();
if (currentInterval == BackOffExecution.STOP) {
log.debug(“backoff expired waiting for segment {} of processor {} to catch up to {}.”,
segmentId, processor.toString(), recordingToken);
throw new RuntimeException(“failed to catch up”);
}
log.debug(“segment {} of processor {} needs to catch up to {} from {}. Waiting for {}ms …”,
segmentId, processor.toString(), recordingToken, currentTrackingToken, currentInterval);
synchronized (processor) {
try {
processor.wait(currentInterval);
// do live refresh of current Status and current Tracking token
currentStatus = processor.processingStatus().get(segmentId);
currentTrackingToken =
Optional.ofNullable(currentStatus)
.map(EventTrackerStatus::getTrackingToken)
.orElse(null);
} catch (InterruptedException e) {
// re-set the interrupted state on the current thread, since we don’t expect to be
// waked up
Thread.currentThread().interrupt();
}
}
} while (currentTrackingToken == null ||
(!currentTrackingToken.equals(recordingToken)) && !currentStatus.isCaughtUp());
log.debug(“segment {} of processor {} is caught up to the catchup token {}.”,
segmentId, processor.toString(), recordingToken);
// continue with next segment
}
}
}
`
Example Code for the TestEventCorrelationInterceptor:
`
/**
- simple message interceptor that serializes the current {@link #currentTestCorrelationId} to the
- {@link Message#withMetaData(Map)} prior to message dispatching
*/
public class TestEventCorrelationInterceptor implements MessageDispatchInterceptor<Message<?>> {
private static final Logger log = LoggerFactory.getLogger(TestEventCorrelationInterceptor.class);
public static final String EVENT_RECORDER_METADATA_KEY = “ai.ikosa.portal.commander.util” +
“.TestEventsRecorderInterceptor”;
public static final String UNSET_CORRELATION_ID = “unset”;
public String currentTestCorrelationId = UNSET_CORRELATION_ID;
public boolean unsetOnNextCatchup = false;
public void setCurrentTestCorrelationId(String currentTestCorrelationId) {
this.currentTestCorrelationId = currentTestCorrelationId;
unsetOnNextCatchup = false;
}
public void unsetOnNextCatchup() {
unsetOnNextCatchup = true;
}
public TestEventCorrelationInterceptor() {
super();
}
/**
- Handles a list of messages and serializes the {@link #currentTestCorrelationId} to the
- Messages metadata before dispatching them, to correlate recorded messages together
- see {@link MessageDispatchInterceptor#handle(List)}