AsynchronousCluster

Hey!

I’m using the DisruptorCommandBus and need the following semantics:

  1. All events must be written to the EventStore before commencing to deliver it to any EventHandlers.
  2. Delivery of Events to EventHandlers must not happen in the threads doing invoking, nor in threads doing the persisting in the EventStore (so I think I want the AsynchronousCluster).
  3. Events for the same instance of an AR must be processed in the sequence they were written to in the EventStore, but different AR instances can be processed in parallel (SequentialPerAggregatePolicy).

I haven’t fully thought out how I want to handle EventHandlers having errors yet.

I’m wiring everything together with Spring, but I don’t think I fully understand how to create an AsynchronousCluster with the values I want (is it really just with constructor-args?), and once I have an asyncCluster bean, I’m not sure how it relates to axon:cluster/ and axon:event-bus/ nor the axon:disruptor-command-bus’ publisher-threads.

Is there a magic ingredient I’m missing? :slight_smile:

JAmes

Hi James,

the first item (persistance before publication) is guaranteed by the DisruptorCommandBus. There is nothing you need to do for that.

The other two items can be achieved by using the AsynchronousCluster.
In Axon+Spring, you can define clusters anywhere in your application context. Each cluster will define criteria that beans much match to be assigned to that specific cluster. You can also define a cluster as the default, causing beans to be assigned to it if there is no match with any of the criteria.

Inside the axon:cluster element, you can specify a element, which defines the actual cluster implementation you want. For now, there is no explicit support for async clusters. I have postponed that for a little while (Axon 2.1, probably). Hope that clarifies the relationship between the and elements.

Now about the publisher threads in the disruptor commandbus. This is the number of threads that will be working on the storage and publication of events. Axon guarantees that for a single aggregate, all events are published by the same thread, in the actual order they were created. These threads also publish the events. But since you’re going to use an Async Cluster, publishing just means putting the events on the queue of that cluster. The actual processing of the event is done by another thread, owned by the cluster.

The order of processing in the async cluster is done using a SequencingPolicy. It returns a value for each event. If the value is identical (equals) for two events, they are processed by the same thread, to guarantee sequential processing for them. If they are different, they may be processed by different threads.

Hope that helps.
Cheers,

Allard

Hi Allard,

Thanks to your description, I’ve got it!

I’ve done a Java-style config for the actual cluster:

@Bean(name = “xxxAxonCluster”)
public Cluster createAxonCluster() {
// tune these once things are functional
ThreadPoolExecutor clusterExecutor = new ThreadPoolExecutor(2, 10, 60L, TimeUnit.MINUTES,
new SynchronousQueue(true), new NamedThreadFactory(“EventPublishing”, true));

return new AsynchronousCluster(“xxxAxonCluster”, clusterExecutor, new NoTransactionManager(),
new SequentialPerAggregatePolicy(), 1, RetryPolicy.RETRY_LAST_EVENT, 500);
}

Then in the Spring XML:

<axon:cluster id=“defaultAxonCluster” default=“true”>

</axon:cluster>

My project has a mix of XML/@Bean style configuration, so this lets me take advantage of both. I couldn’t figure out how to just do a straight bean reference inside the axon:cluster/ stanza.

Am I correct in my understanding that if I want to ensure across JVM restarts (power failures, etc…) that all events stored in an EventStore have been processed by all the appropriate EventHandlers that I’ll need to write some sort of persistent high water mark code myself?

JAmes

Hi James,

it looks like the Java/XML is simpler than what you would need in the xml configuration. Future releases will have xml shortcuts for async clusters as well.

If you need power-down survival, consider using a messaging solution like amqp. It's a good option for async processing. Regular power downs normally wait for the executor service to shut down properly, meaning the queue is processed before the jvm shuts down. In case of power failure, well.... The obvious will happen. I've been thinking about a persistent solution for quite a while, but using something like RabbitMQ is just a lot simpler.

Cheers,

Allard