Events not being handled in the order they were dispatched

Hi all, long time don’t post anything…

We have a app that is using Axon 2.4 for quite sometime with no big problems with Axon, but now we deployed another instance of the same application whose big diference is the amount of commands/events sent to Axon, that is quite bigger in this second instance. And as it happens we are seeing lots of failures do to Events not being handled in the order they were dispatched.

Let me describe the situation: we have 1 “service” aggregate, 1 “service” saga and 1 “task” aggregate.

The “task” aggregate (Tasks) is actually a (parcial) implementation of the WS-HumanTask Specification, so it has a very formal workflow/states/transitions.

The “service” aggregate (Service) it’s our business service. The “service” saga (Saga), among other things, is responsible for the coordination between the Service and the Tasks.

So, a typical flow will be:

(start command) --> Service --> (started event) --> Saga --> START --> Tasks (state: READY --> IN_PROGRESS)
(cancel command) --> Service --> (cancelled event) --> Saga --> RELEASE --> Tasks (state: IN_PROGRESS --> READY)

or

(start command) --> Service --> (started event) --> Saga --> START --> Tasks (state: READY --> IN_PROGRESS)
(commit command) --> Service --> (closed event) --> Saga --> COMPLETE --> Tasks (state: IN_PROGRESS --> COMPLETED)

Commands to the Tasks fail if the state of the Task is not the one expected in the WS-HT workflow, so for instance

(start command) --> Service --> (started event) --> Saga --> START --> Tasks (state: READY --> IN_PROGRESS)

(start command) --> Service --> (started event) --> Saga --> START --> Tasks (state: FAILED

or

(start command) --> Service --> (started event) --> Saga --> START --> Tasks (state: READY --> IN_PROGRESS)

(cancel command) --> Service --> (cancelled event) --> Saga --> RELEASE --> Tasks (state: IN_PROGRESS --> READY)

(commit command) --> Service --> (closed event) --> Saga --> COMPLETE --> Tasks (state: FAILED)

Now, what’s happening is that for some reason if the UA sends very quickly a (start command) + (cancel command) sometimes the RELEASE command get’s to the Tasks before the START command, so effectively we have

(start command) --> Service --> (started event) --> Saga (do some work here before sending the HT command)

(cancel command) --> Service --> (cancelled event) --> Saga --> RELEASE --> Tasks (state: FAILED - it’s in READY instead of IN_PROGRESS)

Saga (finishes the work) --> START --> Tasks (state: FAILED - does nothing actually because it was already FAILED)

We did changed the CommandBus from a SimpleCommandBus to a AsynchronousCommandBus because we also have a batch job that is quite heavy on the service aggregate and consumed all the DB connections very quickly.

So, even if more than probably this change is the culprit, I don’t have a clue on how to avoid this problem without going back to the SimpleCommandBus that will cause the DB connections problems again. So it seems I’m between a rock and a hard place.

Any help/advise is greatly appreciated.

Cheers.

Hi Antonio,

when using the AsynchronousCommandBus, you’re to the merci of your operating system’s thread scheduler. So if you sequentially submit two commands, they will be picked up in the submitted order, but there is no guarantee that they will overtake eachother before acquiring the lock.

When sending a command, you could use the returned CompletableFuture to trigger sending another command that must wait for the former to be completed (see CompletionStage#thenRun() and CompletionStage#thenAccept() ).

Also, the DisruptorCommandBus does a better job at guaranteeing execution order, but required your Aggregates to be Event Sourced.

Hope this helps.
Cheers,

Allard

Hi Allard, how’s things?

When you say “DisruptorCommandBus does a better job at guaranteeing execution order” does that mean “DisruptorCommandBus guarantees execution order” or it’s just a question of having a better probability of that happen?

I think we can’t use the CompletableFuture because the start/cancel or start/commit commands are triggered by the users.

Our aggregates do are event sourced, but that is a source of part of our problems, sometimes it takes ages for a aggregate to load before accepting a command and when dealing with time sensitive issues, like KPIs, that is a big problem. Actually, I’m battling a hell of a battle to keep ES (and Axon) in our design, against pretty everyone else…

I have been looking at the DisruptorCommandBus but I’m afraid at this point if we change such a core component of our architecture we risk to disrupt the entire application (no pun intended). And also there things I don’t understand,

“This Command Bus also acts as a Repository for the aggregates processed by the Disruptor.”

What does this mean exactly? My idea of a Repository is some permanent storage, like a DB or File System, how can a “channel” that is supposed of delivering messages from one place to another be at the same time a permanent storage of any kind?

“Finally, you also need a CommandTargetResolver. This is a mechanism that tells the disruptor which aggregate is the target of a specific Command.”

Isn’t this against the all concept of having Commands/Events as separate, independent components that allow for decoupling between “clients” (the ones that issue the commands) and “services” (the ones that handle those commands) in a distributed environment?

And last, to be frank, “you can provide a DisruptorConfiguration instance” that has nothing less than 15(!) configuration options? In my experience something that may need that amount of tweaking is, well, “suspicious”, to put it mildly.

But then again, I never tried to implement it so maybe my doubts will go away after I do it. Of course, the problem is that I don’t have time to do it just for test, I need to know that is a valid approach.

Sorry for the long post and thanks for your continued help.

Cheers.

Oh, BTW, I forgot to ask, I’m using Axon 2.4, would anything of this change/improve in version 3? Is a upgrade a relatively smooth and easy thing to do? Or it has fundamental architectural changes?

Thanks.

Hi again.

I’m trying to change our AsynchronousCommandBus to a DisruptorCommandBus but i got stuck in the very beginning:

DisruptorCommandBus commandBus = new DisruptorCommandBus(eventStore, eventBus(), disruptorConfiguration);

In our app we have one eventStore per “service” (one day we are planning to separate the app into a micro-service architecture, so this means I can’t have a DisruptorCommandBus in this situation?

I also see in the docs

This Command Bus also acts as a Repository for the aggregates processed by the Disruptor.

What does this means exactly? We already have several EventSourcingRepository, one per per AggregateType (one per service). We have to ditch that? What about all the events we have there?

I think I’m lost here…

Cheers.

Hi Antonio,

what that sentence really means, is that you should use the DisruptorCommandBus to create Repository instances for your aggregates. So instead of defining a bean that simply returns a new instance of EventSourcingRepository, you would create a bean that depends on the DisruptorCommandBus and returns disruptorCommandBus.createRepository(AggregateType).

If you use different event stores for different Aggregates, it might be a challenge when using the DisruptorCommandBus. Right now, it is not really designed for such as usecase. Using different CommandBus instances (which is effectively also what you would so when separating the application into services) would be one solution.

Note that the DisruptorCommandBus has so many configuration items due to the fact that it can achieve high performance in different scenarios. The default settings work for most situations, but in specific cases, you can achieve better performance by tuning some settings. For example, if you know that all commands are supplied via only a single thread (e.g. when you receive them though a server socket), then you can use a “single” ProducerType. If you expect a continuous flow of Commands, then using a BusySpinWaitStrategy will give you best performance (but also puts a heavy claim on your CPU). Etc. Etc.

Looking at how the DisruptorCommandBus currently uses the EventStore, I think it is a simple change in the framework to allow for different repositories to use a different EventStore instance. That looks like an improvement we could build in for the next release 3.1 (or maybe even in 3.0.3).

As you are currently on 2.4, a workaround might be to use a delegating event store implementation that forwards method invocations to the implementations you currently have based on the “type” passed in the appendEvents() and loadEvents() methods.

Hope this helps.
Cheers,

Allard