Handling replay in Saga when sending not idempotent commands

Hi folks,

I’m using AxonFramework with AxonServer and all my aggregates are event-sourced and I’m using an in-memory saga store.

Saga performs fine if executed without interruptions, but I realized that I have a bad implementation, if a the application is restarted during Saga run.
Especially, my problem is that conditionally on on some state delivered by the events, Saga sends out commands to the aggregates which are not idempotent.

As a result the Saga resend commands during the replay of the events after the application restart - leading to an error.

I see generally several implementation ideas how to fix it:

  • Make command idempotent and change the implementation in the aggregates’s command handler. This is sometime possible, but not always feasible.
  • Make Saga track more state: for example, Saga could remember what commands have been sent. Especially, if saga has an in-memory store, it must react on events that result from commands being sent.
  • Make Saga use persistent store… Hm somehow not nice if I rely on ES on aggregate side… Is there any support by the AxonServer for that?
  • Make Saga detect it is in replay: I wonder if there is an equivalent to AggregateLifecycle.isLive() for Saga? If I just could detect, that my Saga is in replay, I would skip command sending…

What do you think? What is the best way to go?

Kind regards,

Simon

I would suggest using permanent storage to persist event processor tokens. Saga is a special type Event Processor. This way you can control this situation much easier.

By default, processors will start from the beginning of the stream. That means your Sagas will be invoked for all historic events, too. If you want to change that, set the initialTrackingToken for your processor to the “head token”.

Hi,

I tried that see here:

https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/axonframework/24WSDdU3grk

public void configureEventHandlerProcessor(Configurer configurer) {
    Duration duration = Duration.ofMinutes(replayTimeInMinutesEventHandler);
    configurer.eventProcessing().registerTrackingEventProcessor("EventHandlerProcessor",
        org.axonframework.config.Configuration::eventStore,
        configuration -> TrackingEventProcessorConfiguration
            .forParallelProcessing(initialThreadCountEventHandler)
            .andInitialSegmentsCount(initialSegmentCountEventHandler)
            .andBatchSize(batchSizeEventHandler)
            .andInitialTrackingToken(s -> s.createTokenSince(duration))
    );
}

But for Sagas this fails with an
“A component is requesting an Event Store, however, there is none configured.”

Any solution for this?

Hi,

any update here?

There was a timing issue in the Spring Boot Autoconfiguration for Sagas. This has been fixed in 4.3.1.

Kind regards,

Hi Allard,

perfect. I’m upgrading to 4.3.1 right now. Therefore this fits perfectly :).

Best, Michael

Hi Allard,

After updating to 4.3.1 it works fine for the normal startup.

BUT for integration tests it does not work. There I still get
caused by: org.axonframework.common.AxonConfigurationException: A component is requesting an Event Store, however, there is none configured

Best, Michael

Hi Allard,

One more finding:

the setting below already causes the error. So the auto configration for sagas in tests do also now work like expected:

axon:
  eventhandling:
    processors:
      CheckoutSagaProcessor:
        threadCount: 8
        initialSegmentCount: 8


Hi Allard,

Any news on this topic?

Best Regards, Michael

Hi all,

Michael, from the given description I am currently not entirely sure what’s broken and what’s not.

I did however test this stuff locally.

What I noticed is that I get an AxonConfigurationException if I both have property file configuration for a Saga and Java configuration (by using the EventProcessingConfigurer).

When using either one, nothing goes wrong on my machine.

So, could you confirm you have both present in your application?

If so, would you be able to provide a bug report on the issue tracker for us to investigate this further?

Then we can proceed to solve the problem at hand.

When it comes to replaying Sagas through the TrackingEventProcessor (TEP), that’s currently not an option at all.
The Saga is a specific type of EventHandlerInvoker, which for it’s EventHandlerInvoker#supportsReset returns false on all occasions.
It’s this support reset method which is validated when you issue a TrackingEventProcessor#resetTokens operation.

This behaviour is currently hard coded as our thought process was that a Saga introduces side effects, and side effects are something you would (likely) never want to replay.
Imagine your Saga would send emails to your customers.

Replaying said Saga would mean they’d be getting the same emails again, which could lead to further confusion.
However, we do acknowledge there might be off scenarios where it does make sense to issue a replay of a Saga.
To that end, we’ve created an issue on the tracker, #1458 to be exact.

If we trace back to Simon’s original four suggestions to cope with “rehandling events”, option four (being “Make Saga detect it is in replay”) could only be achieved if we have implemented #1458.

The other three options are really up to the user of the framework as far as I can tell.
Only option four leaves room for improvement in Axon Framework as far as I can tell.

That’s my two cents to the situation.
Let me know what you think!

Cheers,
Steven

Hi Steven,

Thanks for the feedback. I agree for the Saga. In our scenario we collect the data in the Saga for a later order. So it makes sense that a replay also fills the Saga. On the other hand it would also be great to have an option to e.g. have an annotation for events which you are not allow to replay. e.g. sending an order does not make sense. While catching up events you missed because you e.g. migrate to a new db…

About the AxonConfigurationException - it only happends in test! Means when running integration tests. it is working fine in normal setup.

Cheers,
Michael

Hi Steven,

I checked the #1458 - I think this is not exactly what I mean. We e.g. have to migrate from system a to system b. We still have the same axon and the same eventstore.
But the db of our tracking processors are gone. So we set the replay limit also in the Saga to a Date X so we can recover old status.

I was more thinking for a Annotation I can e.g. place at specific event handlers which are simply not replayed. The point is - The saga does not know that its db was gone. So it is actually no replay for it but normal catching up on events. For now we have limited the replay to 2 hours and made sure the commands we sent do not do too much harm…
The other thing we could do is simply disable the command sending by a flag which we disable during replay and enable it again once done. But then we loose maybe events that come in in the meantime…

I still have no good idea how to handle this without either maybe skipping events or executing things twice.

Cheers,
Michael

Hi Michael,

If the Saga is unaware it is performing a restart, then that also holds for the Tracking Event Processors.
As such, Axon cannot deduce whether it starts for the first time or if the user issued a replay operation.
Due to this predicament, there is no way to use annotation to rehandle a given event, yes/no.

If you want to make sure a Saga, when it is moved from “system a” to “system b” is not replaying from the start, you would be required to initialize it’s tracking token.
This can be achieved by specifying the “initial tracking token” through the TrackingEventProccessorConfiguration you can define on a TrackingEventProcessor.
The config gives you a Function with the StreamableMessageSource. It’s the StreamableMessageSource which provides methods to create a head token, tail token or token at a given point in time.

Trusting this helps you further Michael!

Cheers,
Steven

PS. It is important to note that this mailing list will be discontinued as specified in this thread.
Instead of this mailing list we have moved to use a forum style of communication, at https://discuss-next.axoniq.io/.

Hope to see you there!