Axon and data sharding

Hi,

I’m currently considering using Axon for a new project where performance and scalability are the key aspects. I expect load to be few dozens of thousands per second so being able to scale horizontally as need arise is very important. The command interfaces to the system are mostly message based (e.g. JMS, AMQP or even Redis queues) and also a bit of REST (however most of the REST-based interfaces are for querying purposes). I’m going to use Redis as EventStore implementation (will have to do my own as Axon currently doesn’t provide one) and for now I’m pretty ok with doing presharding of data in Redis based on the aggregate keys. My major concern, however, is affinity of processing to one Axon node for a lifecycle of aggregate/saga or, in general, for a long-enough period of time to avoid lot of conflicts and cache misses.

So for a sake of discussion lets assume I have Node A and Node B with Axon-based application both capable of handling all types of commands. Also there is load balancer which could hit a nodes in round-robin fashion. From what I read about Axon (including year-worth of posts in this group) I need to take care about following aspects of load distribution:

  • Execute commands for same aggregates by same Node to avoid conflicts and to make use of caching
  • Distribute events to all event listeners but make sure that all events generated by same aggregate are getting processed by same Node (important for Saga’s)
  • Avoid potentially long-running requests to external systems as part of handling commands by aggregates
    I think that I could solve the second part by implementing my own version of EventBusTerminal which would use Redis queues to store events (so they could be processed by any nodes) and then only process event by particular node if it is generated by the same aggregate that this node was already processing (later I will describe how to implement this). This approach gives me persistence of events for processing so if node is down I would not loose notification and could eventually process it. The reason I’m not using existing AMQPTerminal is that I want to avoid unnecessary serialization/deserialization as I could just copy particular event already stored in Redis to another list to be picked up by EventHandlers. This should give me less of a performance penalty.

The way how I could ensure that only node that was processing the aggregate already is getting events for this aggregate is to store association between node and aggregateId in separate Redis structure with TTL set to relatively low value (comparable to cache TTL). The TTL is important to make sure that when one node is down all associations will get removed eventually and enable other nodes to pick up the events (which is of course will lead to cache miss initially but in case of node failure this is tolerable).

The first aspect looks more complicated. Lets consider two cases. The first is when REST-based API is called to execute command and the second is when Message-based API is called (i.e. message is being put into input queue). In both scenarios lets assume that two nodes do not have any associations in Redis yet (i.e. fresh start).

In first case load balancer will choose node on a round-robin basis (lets assume for aggregate X it has sent request to Node A). When REST controller is executed on Node A it will create corresponding command for aggregate X and dispatch it to CommandBus. I could write interceptor that would create association between Node A and aggregate X in Redis. After processing of the command event will be stored to EventStore and distributed to Redis-based queue. Both Node A and Node B will have EventHandlers cluster so I would need to enhance logic of the code that distributes events to the cluster to not do so if aggregateId associated with event does not correspond to aggregateId associated with a node. This way Node B will just skip the events generated by aggregate X while Node A will process them. So far so good (unless I’m missing some important concept here).

Now second REST call comes to execute command on aggregate X. The load balancer chooses Node B (it’s not aware of the association). The controller could potentially check the association in Redis and forward the request to Node A (synchronously). After firing command from REST controller in Node A the control will return to Node B (my assumption) and it will send back response about acceptance of command.

Do you think this approach will work and what implications it would have from performance point of view comparing to usage of JGroups-based CommandBus? I planned to use DisruptorCommandBus for my approach with redirects to get optimal performance but I’m not sure if I could use it in combination with DistributedCommandBus.

In the case with Message-based endpoints the approach would be similar but I would have to forward messages from common input queue to Node-specific input queue if there is association missmatch. I’m not sure, though, how to handle situation when messages were already put into Node-specific queue but the Node is down. I don’t see how the messages will be distributed back to common input queue so other nodes could eventually handle them.

Also I would like to know if there are any issues with this approach when it comes to Saga’s processing. As far as I understand they need to receive all events they are interested in and they might be from different aggregates that could be associated to different nodes at different points of time.

I’m sorry for a long post but I think this is quite common problem to solve when large-scale systems are designed. I’m pretty sure that you had similar experience already and this would be great to know how you tackled it. Any suggestions are much appreciated.

Thank you!

I have read more about DistributedCommandBus and it looks like better option comparing to rolling back my own redirection logic. The possibility to use DisruptorCommandBus as a local segment is exactly what I needed and transparent distribution of commands, handling responses, etc. provide me with much simplified solution.

Now I have to ask whether my approach to event distribution is still relevant. Is it really necessary to have events from same aggregate to be distributed to same node that contains command handler for that aggregate? If I choose to use simple local EventBus what potential implications it could have when combined with DistributedCommandBus? Will it lead to any issues with processing of Sagas that potentially could listen to events generated by aggregates potentially residing on different nodes? Also I still need to make sure that I could load-balance the handling of events and make them persistent so power down on one node won’t lead to lost of all unprocessed events.

Hi Oleg,

while reading your previous email, I wanted to remark that your second point, about distributing events to the machine that hold the aggregate generating it is not what you’d generally want to do. Distribution of events to the handlers is completely independent and (in many cases) unrelated to the aggregate.
The decision where to handle those events is based on how the query model needs to be built up. If there is need for specific ordering of events there, then those events must be handled by the same machine. Multiple machines can handle different events in parallel if the models they update are unrelated.
Axon can optimize command handling to ensure serialization of events happens only once. For this, use the SerializationOptimizingInterceptor. In the DisruptorCommandBus, you can use this as an invokerPublisher or as publisherInterceptor. The optimization only works if you use the same serializer for both the storage and dispatching of events.
Sagas are a different case. They are more difficult to distribute, but I am working on a way to do it. However, an active-passive strategy to ensure availability is not that hard to achieve. In Axon, you can configure a Cluster (of Event Listeners) to exclusively consume an AMQP Queue. When two clusters attempt to do so, one will succeed, while the other will attempt to connect, waiting for the first to drop its connection. Make sure to configure you handling to be transactional, so that messages are only consumed from the queue after succesful handling.

Using a local Event Bus is a possibility. Note that a command is considered successful when storage and publication of the events is succesful. With the SimpleEventBus, this means the Events are also handled by all handlers. If something on the machine fails, the sender of the command will receive an error (denoting that the node handling the command has dropped). You can then decide whether it’s safe to resend the command again (having it handled by another node).

Hope this helps.
Cheers,

Allard

Allard,

Thank you. Please see my comments inlined.

Hi Oleg,

while reading your previous email, I wanted to remark that your second point, about distributing events to the machine that hold the aggregate generating it is not what you’d generally want to do. Distribution of events to the handlers is completely independent and (in many cases) unrelated to the aggregate.
The decision where to handle those events is based on how the query model needs to be built up. If there is need for specific ordering of events there, then those events must be handled by the same machine. Multiple machines can handle different events in parallel if the models they update are unrelated.

This is good point. Technically if I have distributed event notification bus then 3 events related to same aggregate could be put into same queue different nodes compete for but it might lead to out-of-order processing of events if Node A process Event3 prior to Node B finished processing of Event2. I assume that the only way to overcome this issue would be to use automatic active-passive configuration for separate clusters where there are handlers for which order of events is important. I could have multiple clusters though for different event handlers so load could be distributed in more uniformed fashion. I would also have to figure out some kind of mechanism to ensure that different nodes get fair proportion of “active” cluster connections. Otherwise node that acquires connections to queues faster could become the only processor of all events (and potentially a bottleneck).

Axon can optimize command handling to ensure serialization of events happens only once. For this, use the SerializationOptimizingInterceptor. In the DisruptorCommandBus, you can use this as an invokerPublisher or as publisherInterceptor. The optimization only works if you use the same serializer for both the storage and dispatching of events.

That’s very handy!

Sagas are a different case. They are more difficult to distribute, but I am working on a way to do it. However, an active-passive strategy to ensure availability is not that hard to achieve. In Axon, you can configure a Cluster (of Event Listeners) to exclusively consume an AMQP Queue. When two clusters attempt to do so, one will succeed, while the other will attempt to connect, waiting for the first to drop its connection. Make sure to configure you handling to be transactional, so that messages are only consumed from the queue after succesful handling.

Looks like same approach as with other event handling will work here. Could you please elaborate more on why it’s important to keep processing events for particular Saga on a same node? Is it because of optimization of loading of Saga (i.e. caching) or it’s because of the mechanism of distributing events to Saga (and its activation)? Doesn’t Saga processing uses same CommandBus as processing of regular commands by other CommandHandlers?

Using a local Event Bus is a possibility. Note that a command is considered successful when storage and publication of the events is succesful. With the SimpleEventBus, this means the Events are also handled by all handlers. If something on the machine fails, the sender of the command will receive an error (denoting that the node handling the command has dropped). You can then decide whether it’s safe to resend the command again (having it handled by another node).

Couple questions here. Whether events be stored in EventStore prior to their handling? If yes, then should I take care about deleting the events from EventStore if something fails during the execution of EventHandler?

And as far as I understand if I use local Event Bus it could actually solve the problem of having active-passive configurations for Event Handlers. For example, every event will be handled by the same node that created it in a first place. And as long as I use DistributedCommandBus the commands for same aggregate will be again delivered to the same node so any Sagas associated with this aggregate will have access to proper events. Of course, with one exception, when Saga requires events from 2 or more aggregates that are happen to be distributed into different nodes. Is this correct assumption?

Hi Oleg,

the reason why events related to a specific Saga instance should be processed by the same node is to respect the saga nature.
This is the short answer. :wink:

The long answer is that a Saga is a stateful event consumer with its own life cycle. Each time a new event is forwarded to the pertaining saga instance, its internal state (of the saga) needs to be rebuilt to let the saga handle the event properly.

Saga Infrastructure, in its various implementations, works to manage this life cycle smoothly using, if necessary, batching and caching to improve the overall performances.

In a distributed environment, dispatching to different nodes events that belongs to the same saga instance would mean easily incurring into nondeterministic behaviors.
These problems don’t occur if events of the same saga instance are delivered, in order, to the same node.

So IMHO, in the case of sagas, the optimal event partitioning - namely that, in theory, should ensure a more uniform load balancing - is at instance level.
In my opionion this is not always possible - due to the broker nature - or easy to implement - due to modeling constraints.

I try to explain: Given that the association between an event and his saga is managed by the AssociationValue, to allow event partitioning at saga instance level probably we need to impose the constraint that all the events of a saga must share the same AssociationValue.

This constraint was not imposed, wisely I believe, in Axon. Is thus possible to design a Saga that is closer to the business process that you want to manage.

In the example provided with the documentation you can find a case like this: OrderManagementSaga is free to use different associationProperty for different kind of events without restrictions. If otherwise, all events should be associated with the property orderId with a significant development burden and a less clear domain model.

In this way you are still free to put this constraint, but only in cases in which you feel useful or necessary.

I am pretty confident that Allard will soon provide us an elegant solution to this case as well. :wink:

For this reasons, i use to implement event partitioning by consumer type for all consumers that require in order dispatch and sticky load balancing, whereas I leave it to the broker in all other cases. I resort to partitioning at the saga instance level only for extreme cases.

Cheers,

Domenico

Domenico,

Thank you for an excellent response! I think now I see the light :slight_smile:

Technically, I could use two options now. The first is to ensure that Saga is always associated with only one value and all events that it listen to contain this association value. This way I could guarantee that Saga instance will be created and tracked in the same Node that evicts the events with a same association value as long as my Command Bus implementation forwards commands to this same node, which is quite easy to achieve (using same association value as a segment selector). This way I could also use local Event Bus because other nodes will never need to know about events generated on the Node which handles particular Saga and associated aggregates. Of course, I could add another Event Bus cluster for event handlers that don’t care about sequence of events and could be called on different Nodes (for better load balancing).

The second option (and they both are not mutually exclusive) is to configure Event Bus cluster which handles particular type of Saga Event Handlers, so regardless of where event happend it will always be forwarded to the Node that is capable of handling this type of Saga (with no affinity based on association value). In this case, however, I need to ensure that only one Node is active for this cluster in any given moment of time (using AMQP Event Bus implementation, for example). Other cluster might still handle other type of Saga so I don’t put all eggs in one basket.

I hope I got it right and probably will try to stick to the first option initially as I anticipate only Sagas with one association per instance. This should give me better load balancing. And I always could return to option two if I find that particular Saga needs more than one association value.

Thank you!

Allard,

Looks like you have sent the post with just a quoted text.

Hi Oleg,

you’re welcome!

However, I realized that I expressed myself badly so I rephrase the sentence in this way:

In a distributed environment the forwarding of events belonging to the same saga to different nodes could result in the simultaneous activation of the same saga instance on different nodes which is definitely not what you want.

Obviously in such a case is the configuration to blame. I would think of it as another variable of distributed computing.
…Every rose has its thorn :wink:

Please consider that I was just sharing my personal experience which may not be useful in your case.

Going back to your considerations, personally, I would treat independently the command routing policies from those of events trying not to make them dependent on each other. As long as you protect the state of your objects (aggregates, sagas or whatever they are) you are on a safe path.

As always, it is best to experiment starting with the simplest case initially and then optimize when needed.
It’s a cliche but it’s true. :smiley:

Cheers,

Domenico

Hi Oleg,

I was going to start a message, but got interrupted. I think I accidentally pressed the send button instead of cancel. Hence the empty message.

Just wanted to let you know that I am working on designing a method of distributing Sagas over multiple machines. As Domenico already pointed out (thanks for that :wink: ), it is a tricky thing. However, I do feel there is a possibility to implement it in such a way that you get pretty good scalability. It’s still in the design phase, so it might take some time for it to be implemented. Unless I can find a sponsor for it.

Be careful when limiting yourself to a single association value per saga. Although this makes distribution a lot safer, it can also cause restrictions on your model. When that happens, models tend to grow more complex than necessary. I’m not saying the rule is a bad thing to start out with, but be prepared to drop the rule when you notice it’s restricting you.

Cheers,

Allard