Our system has tightly-clustered pairs of servers in each regional location, and looser communication between regional clusters.
Within a clustered pair, work is allocated by partitioning the set of aggregate IDs (actually the hashcodes of their UUIDs) across the current members of the cluster, and rebalanced when the cluster membership changes. To avoid crossover during rebalances we also use distributed locks, but these are taken and released only when the membership changes, not every time a command is handled (we have a few hundred commands firing per second: if we try to take a distributed lock for every command it makes the cluster grind to a halt).
Commands can originate from any member of a cluster but are routed to whichever member currently ‘has’ the target aggregate, based on the current affinity of the target aggregate ID. The member which handles a command is responsible for persisting any generated events in the event store DB, which is shared by all members of the cluster. It also pushes those events to the other cluster member(s) via messaging, and they publish the events on their local event buses. So the querymodels and UI-viewmodels in all instances are eventually consistent.
If one instance goes down then the other instances will automatically take on its work when they rebalance based on the cluster topology change. When the dead instance comes back, it replays from the shared event store as normal, backlogging any events being published to it by the other instance(s). We don’t rebalance the work across the cluster until the restarted instance is fully replayed and ready to do work.
This is quite a complex setup and to avoid excessive inter-node communication it relies on every piece of work having affinity to one node at any given time. It needs very careful synchronisation of event replays/catch-ups and affinity changes. But it achieves our goals of distributing work around the world in a scalable way, with failover and rolling upgrades etc, while also pushing results to every UI more-or-less immediately.
The fact that this approach is even possible really says a lot about the excellent separation of responsibilities in CQRS/DDD in general and Axon in particular. Axon has proved itself to be amazingly powerful and flexible.
In summary, I guess the key pattern is to route commands consistently so that you don’t have to do lots of expensive locking or moving aggregates around (except during failover). And then publish the resulting events to the cluster so that all the query-side models are eventually (but rapidly) consistent.
Hope this helps,