Saga Not Completing Due to Missed Event Consumption

Background
During a recent load test, it was observed that, on rare occasions, certain events are not consumed by the saga as expected. This issue occurs during the execution of an operation that involves creating multiple aggregates and performing actions on them. Due to time constraints, the implementation was done in a slightly unconventional manner.

The operation begins with a command that then result in event that triggers the saga. Once started, all related commands are sent independently, and finally, a completion event is sent to end the saga. The entire process is handled within a single Unit of Work. If an error occurs during the process, a rollback mechanism triggers a compensation command. In cases where no error occurs but the operation fails to complete within the defined timeframe, the saga expires after reaching its deadline.

Observed Issue
Rarely, the SagaCompletedEvent sent during this operation is not consumed by the saga, resulting in the saga failing to complete as expected. Testing with brute-force changes seemed to mitigate the issue, which involved delaying the sending of the SagaCompletedEvent, but the exact root cause remains unclear. My hypothesis is that the SagaCompletedEvent is sent too quickly, before the saga is ready to handle this event, leading to situations where it expires after the deadline has passed.

For example, during a load test where 100 requests were sent to trigger this saga, in 2 cases the saga did not complete because the SagaCompletedEvent was not handled, even though it was sent according to the logs.

Below, I attach the pseudocode, logs, and other details that may help in diagnosing and resolving the issue.

@CommandHandler
public String handle(StartSagaCommand command) {
    var id = command.id();
    var compensationCommand = new CompensationCommand(id);

    var unitOfWork = CurrentUnitOfWork.get();
    commandGateway.sendAndWait(new SagaStartedCommand(id));
    unitOfWork.onRollback(
        message -> commandGateway.sendAndWait(compensationCommand));
    unitOfWork.onPrepareCommit(message -> eventGateway.publish(new SagaCompletedEvent(id)));

    executeCommands(command); // 

    return command.id();
}
@CommandHandler
protected void handle(SagaStartedCommand command) {
    eventGateway.publish(new SagaStartedEvent(command.id()));
}
@Saga(sagaStore = "sagaStore")
@ProcessingGroup("sagaProcessor")
public class ExampleSaga {

    public static final String DEADLINE_NAME = "DeadlineName";
    private static final String ID_ASSOCIATION_KEY = "id";

    @Autowired
    private transient CommandGateway commandGateway;
    @Autowired
    private transient DeadlineManager deadlineManager;

    private String id;
    private String deadlineId;

    @StartSaga
    @SagaEventHandler(associationProperty = ID_ASSOCIATION_KEY)
    public void on(SagaStartedEvent event) {
        this.id = event.id();
        deadlineId = deadlineManager.schedule(Duration.ofMinutes(10), DEADLINE_NAME, null);
    }

    @EndSaga
    @SagaEventHandler(associationProperty = ID_ASSOCIATION_KEY)
    public void on(SagaCompletedEvent event) {
        deadlineManager.cancelSchedule(DEADLINE_NAME, deadlineId);
    }

    @EndSaga
    @SagaEventHandler(associationProperty = ID_ASSOCIATION_KEY)
    public void on(CompensationEvent event) {
        executeCompensations();
        deadlineManager.cancelSchedule(DEADLINE_NAME, deadlineId);
    }

    @DeadlineHandler(deadlineName = DEADLINE_NAME)
    public void

 onDeadline() {
        executeCompensations();
        SagaLifecycle.end();
    }

    private void executeCompensations() {
        commandGateway.sendAndWait(new CompensationCommand(id)); 
    }
}
2025-01-21T10:42:52.122Z DEBUG 1 --- [example-service] [nector(Axon)-97] c.e.c.EventDispatchInterceptor    : Publishing event: [GenericEventMessage{payload={SagaStartedEvent[id=074a67e5-344a-441f-bc7f-8d58d110858e]}, ...}]. 

2025-01-21T10:42:52.195Z DEBUG 1 --- [example-service] [nector(Axon)-98] c.e.c.EventDispatchInterceptor    :
Publishing event: GenericEventMessage{payload={SagaCompletedEvent[id=074a67e5-344a-441f-bc7f-8d58d110858e]}, ...}]. 

2025-01-21T10:42:53.091Z INFO [example-service] [Processor]-0] c.e.saga.ExampleSaga:
ExampleSaga(Id=074a67e5-344a-441f-bc7f-8d58d110858e): Scheduled discontinue deadline=deadline-8d155966-02de-4ddf-a86f-a516a5e827b0.

2025-01-21T11:42:53.094Z WARN [example-service] [Scheduler_Worker-5] c.e.saga.ExampleSaga:
ExampleSaga(Id=074a67e5-344a-441f-bc7f-8d58d110858e): Deadline 'DeadlineName' reached. Executing compensating actions.

2025-01-21T11:42:53.094Z INFO [example-service] [Scheduler_Worker-5] c.e.saga.ExampleSaga:
ExampleSaga(Id=074a67e5-344a-441f-bc7f-8d58d110858e): Executing compensating actions.
eventhandling:
 processors:
  sagaProcessor:
	mode: tracking
	 source: streamableKafkaMessageSource

Hey @D3ska, let me see how I can help you here.

A couple of things strike me from your question, which I will reiterate here for clarity.

Note that this is only the case if you DO NOT use any new threads anywhere. So, any use of a distributed command bus, AsynchronousCommandBus, or DisruptorCommandBus would make this point incorrect. Furthermore, if this process is a back and forth between a command dispatching component and the saga, this is also not correct. A saga is backed by an Event Processor, which in your case is streaming (which I base on the streamableKafkaMessageSource property). Hence, there are threads involved.

How did you validate that the event was really not handled? What I tend to do to figure out where the event “is dropped”, is add blunt logging statements. For message handling in Axon Framework, you can use the LoggingInterceptor for that purpose. I would recommend this is set for the StreamingEventProcessor that is providing the events to your Saga.

If the LoggingInterceptor is in place and you do not see the SagaCompletedEvent come by while you would expect it, then it has to do with the message source. If you do spot the interceptor being invoked, but you do not enter the saga, then it is either (1) a bug in AF or (2) an event that is not associated with the saga in question.

I am personally leaning towards an issue with the message source at this stage, which I purely base on the use of Kafka. This mainly has to do with the fact I have most experience with RDBMS’ and Axon Server as the StreamableMessageSource.

Are all 100 requests pointing to the same Saga instance? So, is the association value for the SagaStartedEvent and SagaCompletedEvent (the field referenced with the ID_ASSOCIATION_KEY) identical for all requests?
Or, are these a 100 runs with a different identifier for each?

If a SagaCompletedEvent is sent with a lower position than events that have already been handled, then this might be the case. However, Axon Framework should keep track of gaps like this for you, through means of the GapAwareTrackingToken.

However, I am missing the specifics of the stream and token configuration for your application. Assuming this is still a problem, would you be able to share those?


Sorry, bit of a long-winded answer here. I hope all of it makes sense, @D3ska. When replying, feel free to refer to my sections again!