Scaling horizontally with sourced events


I’m trying to to evaluate how an applications using event sourced aggregates can be scaled horizontally.
Imagine a typical scenario with one instance of an application:

  1. Client broadcast command via the bus or gateway

  2. Aggregate handles the command and it applies events internally

  3. Events are applied and state is changed

  4. Events are persisted (not too sure if this happens before the previous point)

  5. Unit of work is committed and events are broadcasted to the outside world

Now I add a second instance of the application with some kind of load balancer on the command bus.

  1. Client sends the first request
  2. Load balancer sends it to the first node and all previous steps are completed
  3. The second node receives the event but the aggregate state is not updated because it can only be changed via commands
  4. Client sends a second request
  5. Load balancer sends it to the second node but the state of the aggregate in the second node is inconsistent because the event has not been sourced

I tried to implement a couple of solutions but none of them are satisfactory:

  1. Add an event listener to each node that it would check if the event is already in the store and if not, it would add it either directly or by materialising the aggregate and call apply(). This is a hack at the very best.
  2. Add an event listener that rebuilds the original command from the event and then it publishes to the local command bus. Worse idea than before as the command handling method of the aggregate could affect the outside world (e.g. send an e-mail).
  3. Create bespoke commands for state replication like ApplyEventCommand which sole purpose is to have the aggregate process event inside the command itself. Again poor solution as the command does not express a business intention and it’s just a work around.

Has anybody successfully scaled an axon application horizontally? If so, could you please provide some suggestions on how it could be done?



I have succesfully scaled an application horizontally :wink:

When routing commands, make sure you use consistent routing. That means, route commands for the same aggregate to the same machine. On that machine, either locking or a lock-free algorithm (like the one used in the DisruptorCommandBus) will make sure that commands are executed one after the other, on the latest (consistent) state.

The event store will also ensure that even when multiple nodes change the same aggregate, that only the first change will succeed. The second change, which should have been made based on the result of the first, will fail. This is basically the optimistic locking mechanism that should be used as a fallback when the application is in the process of changing the routing.

By the way, this is exactly what the DistributedCommandBus does.



Hi Allard,

The solution you are suggesting although perfectly viable is characterised by session affinity and consequently it has few known drawbacks. Most notably, if one of the nodes goes down, all the aggregates in that node are off-line.
I would not see this as a problem when implementing the like of a shopping cart for example but it is a issue in our case.
Our aggregate models work done by an operator over few hours to a couple of days and that needs to be completed within strict SLA.
Thinking about more about my original problem, I think the solution should be based around distributing the event store rather than using the domain events.
Tools like Zookeeper, Hazelcast, Kafka etc. would probably be a good starting point. However, this is not a trivial piece of development that can be taken on lightly :slight_smile:



Hi Alessandro,

Sorry to mingle in the discussion but I’m not sure I follow your argument exactly. You write:

  1. The second node receives the event but the aggregate state is not updated because it can only be changed via commands

That’s incorrect because the aggregate state is nothing more than its list of persisted events in the event store. The first node already persisted the event to the event store, so assuming the event store is shared between nodes and caching of aggregates is disabled the aggregate state is always the same on both nodes.

If caching is turned on and both nodes make changes to the same aggregate this will not necessarily be the case. However, that is easily prevented by routing all commands for one aggregate to the same node as Allard suggested.

You’re writing that aggregates can go ‘off-line’ if a node goes down:

Most notably, if one of the nodes goes down, all the aggregates in that node are off-line.

However, the distributed command bus will automatically redistribute the commands over the remaining nodes so an aggregate can never go ‘off-line’ assuming there is always at least one remaining node.

Any of the other nodes can simply pick up where the missing node left off by sourcing the aggregate from its events in the event store.


Hi René,

Inputs and questions are always welcome.
I think the source of confusion is me trying to solve a problem by fitting a square peg in a round hole so please let me try again…
We are implementing a microservices architecture where resilience and availability are strong requirements for some of the services.
Scaling an application horizontally without session affinity is one way to achieve such QoS (there are others).
Each service is completely self contained with a separate data store (we are trying to follow Netflix’s guidelines
In order to sync all data stores, we would need to sync the read model and to sync the event and saga stores.
The former is easily achieved by propagating domain events via a distributed event bus while the latter contrary to my initial approach should not be done in the same way.
Having every service node in a cluster be an active mirror of each other with independent storage, will remove single point of failures and increase scalability.
ActiveMQ has replicated message store using LevelDB and Zookeeper ( so a solution to my problem could be implementing event and saga stores with similar technologies,

I hope the above sheds some light on my previous post.



Hi Alessandro,

I dindn’t mention session affinity. What I meant is that while topology doesn’t change, you should send commands for the same aggregate to the same machine. That way, you can easily guarantee order of processing. When topology changes, make sure all “client” will select another, but then the same, machine again. During topology changes, you may encounter optimistic locking failures, because two nodes are temporarily processing commands. Retrying those will resolve that issue.

The Netflix best practices don’t say that each microservice instance should have a separate database. So two instances of the same service will share data, otherwise a response will depend on the instance on which you happen to ask the question.
In fact, what they really say is that you shouldn’t share a data store, not the database management system. If you translate that to the event store, as long as services don’t read or append to eachothers events, you’re fine. If you consider an aggregate owned by a specific component, have only instances that component append and read events for that aggregate.

Just my 2 cents.