Saga/Aggregate: multiple threads and locking

In the situation where we have multiple threads which could trigger command/event handler(s) on an aggregate/saga - how do the aggregate/saga manage concurrency in the default case?

Where exactly are the locks taken for the aggregate?
For the saga I am assuming the lock is claimed by the thread handling the TEP? Could you please elaborate on how that works too.

I was sure I had come across something in the documentation about this but I can’t seem to find it now.

The situation I am in is multiple events being sent on the event gateway concurrently. These will all be routed to event handler(s) of the same saga instance. These event handlers all will mutate the same fields of the saga and so I was wondering how Axon would deal with the concurrent access to those fields.

Hey vab2048, very logical question to look after.

When it comes to looking either an Aggregate or a Saga, it’s the repository implementations which add the locking behaviour.

So more specifically, the Aggregate is loaded through a Repository, meaning there’s an abstract LockingRepository implementation which is the basis for any full Repository implementation provided by the framework. Similarly, Sagas are loaded through a SagaRepository, which has an abstract LockingSagaRepository which is implemented by the (current) sole full implementation, called the AnnotatedSagaRepository.

Concurrency from the Aggregate’s perspective, within a single JVM, is dealt with by having the hard requirement to be able to claim a lock on a given aggregate identifier. Henceforth, it would be the local CommandBus which will place a lock on an Aggregate prior to be able to enter the command handler. The lock will be released as soon as command handling has finalized (through the UnitOfWork). This typically means the process of command handling, event publication within the model (for event sourcing) and the actual publication of those events (and the model, in case of a state-stored aggregate).

The LockingRepository does not provide a distributed locking solution. Thus you would want to consistently route commands to the same JVM instance at all times. If this for some reason does not happen, then the event publication will typically fail due to the uniqueness constraint on the [aggregateId, seqNo] combination on the EventStore, which would result in a ConcurrencyException being thrown.

From the Saga perspective, it would indeed be the process which provides the event to the SagaManager. Most of the scenarios this would mean the tread of a TrackingEventProcessor as you already mentioned, so let us focus on that. Similar as with the Aggregate, the thread handling the event will try to retrieve the Saga to be able to invoke the @SagaEventHandler. In doing so, a lock is thus placed on the Saga, ensure no other threads which have events for the given Saga will be able to load said Saga. Granted, within the same JVM. There is however an additional guard here, which is the TrackingToken a TEP is required to have a claim on to process any event at all. Without such a token claim, no events would be streamed to the thread and henceforth no Sagas could ever be loaded up. Furthermore, even though the event publication might sent out several events, they will still be handled in order by a TEP. Thus concurrent handling wouldn’t occur typically. Lastly, there is a notion of deciding whether a given event should be handled by the TEP at all. It is here that the SagaManager will validate whether the segment (i.e. a part of a TrackingToken) it is dealing with actually allows it to handle the given Saga.

All the above serve as the means to ensure concurrent access on Aggregates or Sagas shouldn’t happen. If you still encounter such a scenario from the Saga’s perspective, chances are high the TrackingToken store is not shared between distinct instances of your application. That would be the first place to look.

Trust this helps you out vab2048!

1 Like

Thanks very much Steven.

For my own future reference (I know I will end up asking myself this question again in the future), the classes you referred to are:

  • Aggregate:
    • modelling:org.axonframework.modelling.command.LockingRepository (abstract class)
    • axon-eventsourcing:org.axonframework.eventsourcing.EventSourcingRepository (extends LockingRepository)
    • you can see where the lock is obtained by searching: lockFactory.obtainLock in LockingRepository.
  • Saga:
    • modelling:org.axonframework.modelling.saga.repository.LockingSagaRepository (abstract class)
    • modelling:org.axonframework.modelling.saga.repository.AnnotatedSagaRepository (extends LockingSagaRepository)
    • you can see where the lock is obtained by searching for invocations to the lockSagaAccess method in the LockingSagaRepository (whenever load or createInstance is called).

And so as I have understood it:

  • Concurrent access to state within an aggregate/saga will never occur within the same JVM.
  • This is because only one unit of work (within the same JVM process) can possess the lock for an aggregate/saga at a time.
  • Other unit of work instances in the JVM for the same aggregate/saga may exist but they will block until they are able to grab the lock.

Is this an accurate summary?

(I’m assuming Axon framework defaults here i.e. Pessimistic Locking)