Restarting a saga tracking event processor?

I’m experimenting writing integration tests and am trying to come up with the best way to “reset” the state (command query side of the DB) between tests.

The steps I’m following:

  1. Shutting down all tracking event processors.
  2. Resetting the DB (resetting the schema to the initial start state i.e. clearing all events and projections).
  3. Restarting the tracking event processors.

Unfortunately on step 3 my code fails:

    private void restartTrackingEventProcessors(EventProcessingConfiguration eventProcessingConfiguration) {
        Map<String, EventProcessor> eventProcessors = eventProcessingConfiguration.eventProcessors();
        eventProcessors.forEach((processorName, processor) -> {
            // If it is tracking event processor then reset the token and start it up.
            Optional<TrackingEventProcessor> trackingEventProcessor = eventProcessingConfiguration.eventProcessor(processorName, TrackingEventProcessor.class);
            if(trackingEventProcessor.isPresent()) {
                var tep = trackingEventProcessor.get();
                log.info("Restarting: {} {}", processorName, processor);
                tep.resetTokens(); // Will throw if the TEP has not already been shutdown.
                tep.start();
                log.info("Restarted tracking event processor: {}", processorName);
            }
        });
    }

The reason is because it hits a Saga tracking event processor (the others work fine) and we get the error:

java.lang.IllegalStateException: The handlers assigned to this Processor do not support a reset

What is the best way to reset the saga TEP so that the state of the system is “fresh” for the next test?

Fair question, @vab2048. We actually have an outstanding issue to discuss the subject.

Note that it’s not supported at the moment as a Saga, by definition, introduces side effects.
And reintroducing side effects in a production environment is a recipe for disaster.
In most cases, that is. Hence why, the issue is there.

To the question at hand, though.
I think the more straightforward step you can take is following the old-fashioned approach to resetting a StreamingEventProcessor, which is by removing the token.
From the perspective of Axon Framework, it wouldn’t be a replay, strictly speaking.
But for the sake of integration testing, that should be completely fine.

I assume that should be sufficient in this case.
Let us know whether that works, @vab2048!

1 Like

Thanks @Steven_van_Beelen!

Removing the line:

                tep.resetTokens(); // Will throw if the TEP has not already been shutdown.

got everything to work. It worked since I was deleting the tokens already (by resettingthe DB) in step 2.

I end up with the TEPs retrieving null:

2022-06-13 10:28:55.353  INFO 27024 --- [uery.payment]-1] o.a.e.TrackingEventProcessor             : Fetched token: null for segment: Segment[0/0]
2022-06-13 10:28:55.353  INFO 27024 --- [agaProcessor]-1] o.a.e.TrackingEventProcessor             : Fetched token: null for segment: Segment[0/0]
2022-06-13 10:28:55.360  INFO 27024 --- [uery.account]-1] o.a.e.TrackingEventProcessor             : Fetched token: null for segment: Segment[0/0]

And then just recreating the token between tests :slight_smile:

1 Like