Asynchronous Event Handling for Sagas

I'm using Axon 1.2 and would like to handle events for different Saga
instances asynchronously in parallel.

In the reference manual the asynchronous event handling is described
as follows:

"When an executor is provided, the SagaManager will automatically use
it to find associated Saga instances and dispatch the events each of
these instances. The SagaManager will guarantee that for each Saga
instance, all events are processed in the order they arrive. For
optimization purposes, this guarantee does not count in between
Sagas."

My understanding is that events for different Saga instances should be
handled in parallel if I provide an executor. But in fact they are
handled sequentially one after the other.

In the code I see, that AbstractSagaManager creates an
AsynchronousSagaExecutor which sets a SequentialPolicy. Shouldn't
there be something like SequentialPerSagaPolicy set? But everything is
private and hard coded, so I see no way to override this behaviour.

Am I missing something?

Thanks,
Martin

Hi Martin,

this issue is currently at the top of my agenda. The first version of the asynchronous saga manager used the SequentialPerSagaPolicy. However, there is a problem with that implementation:
Consider three incoming events, A, B and C. Let’s assume the handler for A creates an association that will cause the saga to associate with event C.
The manager, will assign A to the worker pool of the Saga, as well as B, but not C. That’s because A may not have been executed yet, when C is assigned. This can result in very nasty bugs.

To provide a quick solution for that behavior, I have set the Async manager to only use a single thread for the moment. I am working on a design that can use multiple threads, without risking different behavior between sync and async approaches. Help is always welcome :wink:

Hope this clarifies the sequential behavior.
Cheers,

Allard

Hi Allard,

thanks for your fast response!

Ok, I see the problem, but in my use case this shouldn't happen
(hopefully).

I will have a look at the old implementation.

Cheers,
Martin

Hi Allard,

I hacked the AsynchronousSagaExecutor to use a SequentialPerSagaPolicy
but then I had new problems (only under load):

1) In the asynchronous saga event handling the transaction introduced
by the SpringTransactionalInterceptor was not commited because that
transaction was not new (it was nested into the transaction started by
the AsynchronousExecutionWrapper).

I could solve this by setting propagation to PROPAGATION_REQUIRES_NEW
in SpringTransactionalInterceptor (only possible by hack, not
configurable).

2) When for the initial command, that publishes an event that then
triggers the creation of a saga, the unit of work is commited, then
first the events are published. Since my saga is working
asynchronously now, it schedules an asynchronous task for handling
that event in the saga. After that the first transaction is commited.

Occasionly it happens, that the asynchronous task runs before the
transaction is commited and so the aggregate created by the initial
command can not be loaded (AggregateNotFoundException) in the event
handling thread.

I found no solution for this concurrency problem yet.

Any ideas?

Cheers,
Martin

Hi Martin,

I’ll rephrase, just to make sure I understand correctly. So you have a command, that creates a new aggregate, and emits an event. That event is picked up by the Saga, which sends a command to alter that same aggregate, which then returns a “AggregateNotFoundException”, because the transaction of the first command is not always committed.

First question that pops to mind is: why use a saga to tell an aggregate something it should already know? If you send a command to an aggregate based on an event from that same aggregate, you’re probably (but not certainly) using a saga where you don’t have to.

Which repository implementation do you use? And which version of Axon? I just checked the code of the LockingRepository, and it places a lock on an aggregate when it is added to the repository. That means that any subsequent commands must wait until that lock is released, which happens after a UnitOfWork (and its attached transaction) is committed. Did you place your own transactional boundary somewhere?

Cheers,

Allard