Race condition between saga and command using DistributedCommandBus

I recently started trying the JGroupsConnector to spread my application across multiple hosts. It mostly works out of the box which is great, but I did run into one problem that others might also run into and I figure I should describe how I fixed it. This is on Axon 2, but I suspect the same situation could happen on Axon 3 with subscribing event processors. (Maybe not with tracking processors, though.)

The problem manifests itself when you have saga code like

`
private void startSomeWork(CreateMyAggregateCommand command) {
associateWith(“aggregateId”, command.getAggregateId());
commandGateway.dispatch(command);
}

@SagaEventHandler(associationProperty=“aggregateId”)
public void on(MyAggregateCreatedEvent event) {
// do the next thing in the saga’s workflow
}

`

This works fine most of the time, but occasionally, when the timing is just right, the following happens:

  1. Saga on host A dispatches the creation command
  2. Creation command gets routed to host B
  3. Aggregate on host B publishes the MyAggregateCreatedEvent
  4. Saga is written to database on host A

The problem is that at the time the event is published on host B, the fact that the saga is associated with the aggregate ID hasn’t hit the database yet. So the event bus on host B thinks nothing is interested in the event, and the saga event handler is never invoked.

Right now I’m fixing it like this:

`
private void startSomeWork(CreateMyAggregateCommand command) {
associateWith(“aggregateId”, command.getAggregateId());

CurrentUnitOfWork.get().registerListener(new UnitOfWorkListenerAdapter() {
@Override
public void afterCommit(UnitOfWork unitOfWork) {
commandGateway.dispatch(command);
}
});
}

`

By deferring command dispatch until after the saga (with its associations) has been committed to the database, host B always sees the association and invokes the saga event handler without issues.

It’s not necessary to wrap all commands like this, of course, just ones that occur in a unit of work where a saga’s associations change in a way that affects its ability to receive whatever events result from the command.

Is this a sane approach, or is it a sign I’m doing something wrong?

-Steve

Hi Steven,

yes, this actually a sane approach. In fact, I’m seriously considering using this mechanism ‘built-in’ in all the components that trigger async activity. That way, it will never happen that an async action is performed without the ‘cause’ if this activity to have been committed.

The only issue is that senders may use ‘sendAndWait’ semantics. These would then not work anymore.
For now, I think implementing this behavior on the command gateway is the only place where it can be implemented safely. The gateway would ‘park’ the dispatch until afterCommit and short circuit it when the sender is waiting for the result.

Cheers,

Allard

PS. Thanks a lot for sharing your issues and solutions! I’m sure they’re very helpful to many in the community.

I ended up deciding to make this an across-the-board thing because it’s too error-prone to expect people to remember to do this in exactly the right places. So my command gateway is now actually a proxy:

`
@Bean
public CommandGateway commandGateway(CommandBus commandBus) {
CommandGateway gateway =
new GatewayProxyFactory(commandBus).createGateway(CommandGateway.class);

return (CommandGateway) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class<?>[] { CommandGateway.class },
(proxy, method, args) -> {
// Commands that return a value or have declared exceptions are synchronous, so
// dispatch them immediately. Also dispatch immediately if there is no unit of work
// active, since there’s therefore nothing waiting to be committed to the database.
if (method.getReturnType() != void.class || method.getExceptionTypes().length > 0 ||
!CurrentUnitOfWork.isStarted()) {
return method.invoke(gateway, args);
}

CurrentUnitOfWork.get().registerListener(new UnitOfWorkListenerAdapter() {
@Override
@SneakyThrows
public void afterCommit(UnitOfWork unitOfWork) {
method.invoke(gateway, args);
}
});

return null;
});
}

`

Hi Allard,

I am facing some similar race condition between two commands.

Flow:-

  1. I have an interceptor that calls SAGA to send a command using the gateway to aggregate root. This works fine when the request does not contain another command, so just the SAGA command is executed perfectly.

  2. Now if I fire some request to my controller that will call some service to send some command over the gateway, issues arise. The request is still first intercepted by the interceptor but the order in which the commands now land on aggregate is first the service command, then the command from SAGA, which defeats the business purpose. If I see still the command is first dispatched by SAGA, and then by service layer but still the order in which they are received at aggregate root is opposite.

I am using axon 2.4.6 and using defaultCommandGateway and DistributedCommandBus.

Regards :
Harinder Singh

Hi,

my first inpression is that it feels awkward to invoke a Saga to send a command from an interceptor. Sagas are ‘designed’ to react to events. Not sure what you’re trying to achieve (funtionally), but there might be a cleaner way.

If you’re using a DistributedCommandBus, commands are (potentially) sent to other nodes, where they can be executed in parallel. ‘Race conditions’ here are normal.

Cheers,

Allard