Inconsistent read side caused by out of order events

We encountered a problem that will cause the read store to be inconsistent with the write store because of out of order events. Following described the problem and some options we thought to address it. But non of them seems good enough because they’ll add quite a lot of conflict resolution logic to the application code. Looking for better solutions.

The problem
We have 2 domains. Each domain is an axon application.
Domain1 has an event sourcing aggregate root that’ll change its status based on user requests. Those status change events are stored in eventstore and published to eventbus (JMS based).
Domain2 listens to the JMS event bus for those status change events by using EventListener. the EventListener will save the status to its read JPA based store (to be used by other logic).

The problem happens when, in Domain1, the status changes from A->B and then from B->C in very short time, the 2 events for those 2 changes go to the JMS. But when Axon+spring load the messages from the JMS and trigger the event listener, we found the events could be executed out of order. (The status change event only contains the changed value.) if B->C runs before A->B, the read side will end with status = B, which is inconsistent with write side (status = C).

It looks the event could be out of order because

  1. domain2 has a multiple machine cluster that listens on the same ActiveMQ queue and they runs in paralell
  2. JmsListenerContainerFactory could be configured to run multiple thread.
    BTW, we use SimpleCluster, which should not causes concurrent problem itself.

Some options.

  1. High water mark sequence number
    Add logic in event listener. when process an event, comparing the event’s sequence number(or timestamp) with the last sequence number in the read store, if event’s sequence number is bigger, update the status read store with the sequence number; otherwise discard the event.
  2. Replay events to build latest read side
    On receive a event, load all the events for the corresponding aggregate from the event store and replay it. then store the latest status in read store
  3. Include From/To status in event + retry
    Add both the old status and new status. In the event listener, comparing the old status with the status in the store. if match, updated; if not, throw exception to cause the event to be retried later. And hope the earlier event could be processed during the wait time.

The problem could get more complicated if the read store is a denormalized view of events from multiple domains.

Looking forward for your thoughts.

Thanks
Rick

Hi Rick,

given that you use ActiveMQ have you considered to use message groups to preserve the order of messages?
http://activemq.apache.org/message-groups.html

I used this feature to implement load balancing in multi-node axon based applications as described here:
https://bitbucket.org/aktivecortex/aktivecortex/wiki/features.md

You can find an implementation here: https://goo.gl/YKGiA4

Cheers,
Domenico

Thank you, Domenico.

It works! The only problem is that this seems to be supported by only ActiveMQ. RabbitMQ and other messaging infrastructure seems don’t support this.

Hi Rick,

Glad to know it worked for you.

You are right: message ordering has always been hazy in JMS.
Specs seem to include the feature but don’t clearly state what should be the expected behavior:

"JMSXGroupID and JMSXGroupSeq are standard properties that clients should use if they want to group messages. All providers must support them. Unless specifically noted, the values and semantics of the JMSX properties are undefined."
javax.jms.Message Interface - https://docs.oracle.com/javaee/6/api/javax/jms/Message.html

So each provider interpreted and implemented this in its own way.

At the time I had to deal with this issue I did some research and I’ve found that, in spite of the vagueness, it’s supported by many JMS providers.

I recap below what I found:

Besides JMS I have no experience with other protocols in this regard.

Cheers,
Domenico

Domenico, thanks for this valuable information. Just wondering, would it be possible (and useful) to state that JMSXGroupID is always the aggregate identifier and JMSXGroupSeq is the sequence number? Or is that too much abuse?

Rick, in AMQP there is another solution. Basically, all messages sent over the same channel are guaranteed to be published to the queue in that order. To make sure messages sent on different channels are sent in-order, AMQP supports transactions. Rabbit also allows for server-side ACKs, which are a cheaper alternative to transactions (but don’t guarantee atomicity). Both transactions and server-side acks are supported in the Axon AMQP connector.

Cheers,

Allard

(Note that with AMQP, I mean version 0.9. )

Hi Allard,

My pleasure.
I used the following approaches to assign a routing key to the outgoing messages according to the payload type:

  • Command: Aggregate Identifier

  • Event: Competing Listener Name

Please consider that, beside to ordered delivery, you could be interested in having exclusive consumers (quite often part of message ordering features).
Depending on the JMS Provider you can map the routing key to the provider specific property (which in most cases is JMSXGroupID)
JMSXGroupSeq shouldn’t be needed.

Cheers,
Domenico