Hi,
I’m currently considering using Axon for a new project where performance and scalability are the key aspects. I expect load to be few dozens of thousands per second so being able to scale horizontally as need arise is very important. The command interfaces to the system are mostly message based (e.g. JMS, AMQP or even Redis queues) and also a bit of REST (however most of the REST-based interfaces are for querying purposes). I’m going to use Redis as EventStore implementation (will have to do my own as Axon currently doesn’t provide one) and for now I’m pretty ok with doing presharding of data in Redis based on the aggregate keys. My major concern, however, is affinity of processing to one Axon node for a lifecycle of aggregate/saga or, in general, for a long-enough period of time to avoid lot of conflicts and cache misses.
So for a sake of discussion lets assume I have Node A and Node B with Axon-based application both capable of handling all types of commands. Also there is load balancer which could hit a nodes in round-robin fashion. From what I read about Axon (including year-worth of posts in this group) I need to take care about following aspects of load distribution:
- Execute commands for same aggregates by same Node to avoid conflicts and to make use of caching
- Distribute events to all event listeners but make sure that all events generated by same aggregate are getting processed by same Node (important for Saga’s)
- Avoid potentially long-running requests to external systems as part of handling commands by aggregates
I think that I could solve the second part by implementing my own version of EventBusTerminal which would use Redis queues to store events (so they could be processed by any nodes) and then only process event by particular node if it is generated by the same aggregate that this node was already processing (later I will describe how to implement this). This approach gives me persistence of events for processing so if node is down I would not loose notification and could eventually process it. The reason I’m not using existing AMQPTerminal is that I want to avoid unnecessary serialization/deserialization as I could just copy particular event already stored in Redis to another list to be picked up by EventHandlers. This should give me less of a performance penalty.
The way how I could ensure that only node that was processing the aggregate already is getting events for this aggregate is to store association between node and aggregateId in separate Redis structure with TTL set to relatively low value (comparable to cache TTL). The TTL is important to make sure that when one node is down all associations will get removed eventually and enable other nodes to pick up the events (which is of course will lead to cache miss initially but in case of node failure this is tolerable).
The first aspect looks more complicated. Lets consider two cases. The first is when REST-based API is called to execute command and the second is when Message-based API is called (i.e. message is being put into input queue). In both scenarios lets assume that two nodes do not have any associations in Redis yet (i.e. fresh start).
In first case load balancer will choose node on a round-robin basis (lets assume for aggregate X it has sent request to Node A). When REST controller is executed on Node A it will create corresponding command for aggregate X and dispatch it to CommandBus. I could write interceptor that would create association between Node A and aggregate X in Redis. After processing of the command event will be stored to EventStore and distributed to Redis-based queue. Both Node A and Node B will have EventHandlers cluster so I would need to enhance logic of the code that distributes events to the cluster to not do so if aggregateId associated with event does not correspond to aggregateId associated with a node. This way Node B will just skip the events generated by aggregate X while Node A will process them. So far so good (unless I’m missing some important concept here).
Now second REST call comes to execute command on aggregate X. The load balancer chooses Node B (it’s not aware of the association). The controller could potentially check the association in Redis and forward the request to Node A (synchronously). After firing command from REST controller in Node A the control will return to Node B (my assumption) and it will send back response about acceptance of command.
Do you think this approach will work and what implications it would have from performance point of view comparing to usage of JGroups-based CommandBus? I planned to use DisruptorCommandBus for my approach with redirects to get optimal performance but I’m not sure if I could use it in combination with DistributedCommandBus.
In the case with Message-based endpoints the approach would be similar but I would have to forward messages from common input queue to Node-specific input queue if there is association missmatch. I’m not sure, though, how to handle situation when messages were already put into Node-specific queue but the Node is down. I don’t see how the messages will be distributed back to common input queue so other nodes could eventually handle them.
Also I would like to know if there are any issues with this approach when it comes to Saga’s processing. As far as I understand they need to receive all events they are interested in and they might be from different aggregates that could be associated to different nodes at different points of time.
I’m sorry for a long post but I think this is quite common problem to solve when large-scale systems are designed. I’m pretty sure that you had similar experience already and this would be great to know how you tackled it. Any suggestions are much appreciated.
Thank you!