EventSourcedAggregate and sequence number

Hi

Over the past weeks, we performed some load test of my Axon based service. The test was very simple, a series of commands sent on the same aggregate, with the service deployed as a docker image (1 instance, distributed command bus active) on a docker swarm.

We detected a strange (and random) error, a duplication of the sequence number when writing an event on the Event Store. The writing was then blocked by Event Store itself, which detected the duplication of (aggregateID, sequenceNumber) pair.

I thought this was kinda strange, given that we were using only 1 instance of the service and we’re pretty sure that all commands are directed, by the distributed command bus, on that instance.

Looking at the code of EventSourcedAggregate class I noticed that the code that generates the sequence number, when an aggregate is loaded from the Event Store, is the following

@Override
public Long version() {
    return lastEventSequenceNumber < 0 ? null : lastEventSequenceNumber;
}

/**
 * Returns the sequence number to be used for the next event applied by this Aggregate instance. The first
 * event of an aggregate receives sequence number 0.

Hi Franco,

First off, no worries for getting that fix in! You’re request on the mail thread was the trigger for me to implement it. Thanks for pointing out the issue in the first place.

Second, I’d think this isn’t an AtomicLong, because it’s suggested to have a single instance handle all the commands for a given Aggregate, definitely for Event Sourced Aggregates (which thus means that only one instance would be incrementing a sequence number).

Let’s take the stance that your aggregates are event sourced and you’re running a distributed set up.

Of two nodes would handle a commands for the same aggregate, they thus both pull the events for that aggregate and recreate the state of it. It’s more efficient to do this once i.o. twice. Additionally, this scenario could give you concurrency issues, like your experiencing right now, on inserting an event with a given sequence number.

This exception is desired behavior in an Event Sourced Aggregate.
Lets take the example that we would have an AtomicLong for the sequence number, and we have a two nodes Axon-application running.
If both nodes concurrently event source the same Aggregate and call a command handling function which applies an event, if we’d have the AtomicLong there wouldn’t be an issue with the sequence numbers being identical between both events. However, in this scenario one of the aggregates worked on stale state of that aggregate, as it hasn’t seen the event the other nodes published at that point in time. Hence the easier solution is to call out a concurrency error.

It is, however, like you’re suggestion yourself, curious that this happens whilst you’ve got one instance running. Why are you using a DistributedCommandBus whilst you’ve got a single instance by the way? In that scenario, the local segment (aka the local command bus) should be used. I think you’re also using the regular aggregate repositories? So, an implementation which implement the ‘LockingRepository’, thus enforcing that a single instance cannot load an aggregate twice.

Hope this gives some insights and that we can come to a solution!

Cheers,

Steven

Hi Franco,

to elaborate a bit on Steven’s answer: Axon ensures that only a single thread will invoke an Aggregate at any time (assuming a single instance). Changing the counter to an AtomicLong wouldn’t change anything.
My guess is that you’re running MySQL (or a branch of it) with the default transaction isolation settings. That’s because MySQL, by default, uses repeatable read and implements it by working on a snapshot of the database as it was when the transaction started. In Axon, a lock prevents duplicate access to an Aggregate, but the transaction is already in progress. When the lock is released (and new events have been added to the database), the second thread doesn’t see these events because of the Repeatable Read. It then tries to append an Event that already exists, and rejects it.

The solution is to change the transaction isolation to READ_COMMITTED, which is a much more sensible default, imo.

Cheers,

Allard

Hi Allard

Sadly it’s a little bit complicated. Actually I’m not running MySQL, given that one of the requirements our DevOps team gave us was to avoid relational databases… and also to avoid Mongo DB :smiley:

So, the Event Store is a custom implementation (wrote by myself) based on Cassandra. I implemented the duplicated key check on the event entry table using the mechanism offered by Cassandra called “Lightweight Transaction” (INSERT … INTO … IF NOT EXISTS) , which allows me to check if the insertion of the event is actually performed or not.
The main drawback is that I cannot execute the statement in a batch, because in the same method I’m updating the event entry table and another table that exposes events with the global index as a clustering key, in order to let the gap-based event tracker work correctly.

Anyway, let’s go back to the issue.
My service will actually run with more that one instance, but for our first load test we configured the service with a single instance, but we kept the production configuration, with the Distributed Command Bus active (think of it as a mistake). This explains why we ran the test with the distributed configuration active on a single instance :smiley:

Anyway, suppose we’re in a production environment, with more than one instance of the service (distributed mode: on). If I understood well, the Command Gateway, through the ConsistentHash, redirects all the commands for a specific aggregate always to the same node. With a high load, I think I can experiment the duplicated key error I described above. The use of AtomicLong to handle the increment of the sequence number is the first solution that came to my mind to face the problem.

I think that we’re going to perform a load test with a real distributed configuration (more than one instance) very soon. As soon as I have some results, I will share them with you.

Cheers

Franco

Hi Franco,

if you encounter this one a single-node setup, you will also encounter it on a multi-node setup. The simple reason is that apparently, there is a possibility of processes reading stale data from the event store.
We have done a quite extensive comparison of different databases and how they support the Event Store use cases. Relational databases came out best by quite a distance. While Cassandra looks nice feature-wise, there is a few things poorly implemented (from the perspective of the Event Store, not in general). Lightweight transactions aren’t as lightweight as the name suggests. In fact, they suggest only using them sporadically.

That why we’ve started development of a dedicated Event Store. It focuses on storing events (which are essentially immutable data streams) and being able to load them based on aggregate identifier, or as a full stream. If you expect high loads, this is definitely something worth looking at, because it can save you a lot of time and hassle.

Cheers,

Allard

Hi

Given the requirements I had, Cassandra was the only viable solution. Other possibilities were Couchbase or CouchDB, but none of them seemed to fit well.

I’ll definitely take a look at AxonIQ Event Store. Is there a possibility to download and evaluate it? I send an email some time ago through the form on AxonIQ web site but I received any answer.

Thank you