Multiple command buses?

Our application sometimes generates large bursts of commands. These all execute without trouble, but any smaller, unrelated requests that arrive while the application is handling those commands end up getting queued after the bursts and can thus take abnormally long to finish. In most cases that’s not a problem but there are a few commands that we want to finish in a reasonable amount of time. It so happens that those latency-sensitive commands are all aggregate creation commands.

Ideally it’d be great if there were some notion of command priority such that I could set the bursty commands to low priority and they would have only minimal impact on latency of other work.

But that’s probably a pretty deep change with lots of edge cases to worry about, so I’m wondering if instead I can add a second command bus to the application to handle the latency-sensitive synchronous aggregate creation commands. Since they are all aggregate creation commands, there’s no chance of one of them conflicting with a command for the same aggregate on the main command bus, which seems to me like it’d be the major possible source of problems with a two-bus setup. And it also seems like the new one could be a SimpleCommandBus so as not to bottleneck on any JGroups message queues (our main bus is a DistributedCommandBus).

What gotchas am I not thinking about, if any? Is there another, better way to solve the starvation problem?

-Steve

Hi Steve,

If it’s the command handling side which slows down, using the DisruptorCommandBus as your local segment might speed up things.

The purpose of the DisruptorCommandBus is to allow separate threads pools for command handling (the invokerThreadCount if I’m correct) and event publishing (the publisherThreadCount in the DisruptorConfiguration, again if I’m correct) on the command/write side of your application.

Maybe playing around with that and playing with the number of threads might solve the issue your encountering.

The other idea I’m thinking off, although my hunch is it’ll not be a feasible solution, is to introduce a separate instance of your application dedicated to handling the ‘burst of commands’ your talking about, with an increased load factor. That way, it’ll most likely receive the multitude of burst commands, whilst your other nodes are (relatively) free to handle the create-aggregate commands.

That’s my two cents at least.

Cheers,

Steven

Hi Steve,

Let me add a few things here. We’ve seen this issue at a couple of other Axon users, primarily in cases where a system has interactive use by humans as well as non-interactive batch processing, incoming data streams etc.

We’re developing a new product called AxonHub (previously referred to as “messaging platform”) which a plug and play distribution mechanism for Axon commands, events and queries. It has a command prioritization feature, for exactly this use case. We’re currently in private beta phase with this. If you want I could give you a demo and you could give it a try yourself, just let me know if you’re interested.

Cheers,

This ended up being easier than I expected. I’d somehow managed to forget that there is already a local segment (in our case, a SimpleCommandBus) sitting under the DistributedCommandBus, and it’s perfectly fine to dispatch commands directly to it. No need to add a brand-new command bus to the configuration. Dispatching to the local segment ended up working perfectly and the application now reliably processes the latency-sensitive requests in a reasonable amount of time even when there’s batch processing going on.

-Steve

After working on this problem some more, we built something a bit more sophisticated to solve this: a kind of scheduler for outgoing commands. In any place in the code where a bunch of commands could be sent in rapid succession (in practice: anything that is iterating over a batch of transactions) we “dispatch” the commands by inserting them into a database table and then, once we’ve run through the list, publishing a single “some new commands have been queued” event. The table and the event include a “batch ID” field that is the same across all the commands in a batch.

A singleton saga listens for those events. It maintains a circular list of active batch IDs and a list of currently outstanding commands. Each time a new commands-queued event arrives, it adds the batch ID to its active list. If the list of outstanding commands is smaller than the concurrency limit, it polls the database table for the next command for that batch. As commands finish, it advances to the next batch in the list and polls for that batch’s next command. In short, it does simple round-robin scheduling by batch ID.

This depends on our commands and events having a certain structure. All our transaction-related commands and events implement an interface that includes a getter for the transaction ID, something like

`
interface HasTransactionId {
public TransactionId getTransactionId();
}

`

When we dispatch a command for a transaction, we’re able to call associateWith() to listen for events from the command’s transaction. Every command handler in our aggregate code is required to publish at least one event. The saga has an event handler like

`
@SagaEventHandler(associationProperty = “transactionId”)
public void on(HasTransactionId event) {
removeAssociationWith(“transactionId”, event.getTransactionId().toString());
numberOfOutstandingCommands–;
sendNextCommand();
}

`

This isn’t completely airtight; we don’t know for certain that the event was a result of the specific command we dispatched. But in practice it doesn’t matter because our goal is just to limit the number of concurrent commands.

This setup has completely eliminated the problem of command bus starvation for us. Now a client with a small request gets nearly identical responsiveness from our API whether or not there are big requests in progress. The big requests do suffer a slight throughput hit due to the extra indirection, but the command queuing ends up being a very small percentage of the total work we need to do, so the difference is negligible.

It’s worth noting we could have done this with something other than a saga. You could achieve the same effect with standalone command and event handlers and database locking. But the saga gave us already-battle-tested concurrency control and made the implementation very straightforward and compact.

-Steve