Issue 185 - EventNotification Policy for a distributed Event Bus

Hi all,

I'm struggling to find the best way to deal with event dispatching
problems that arise in a distributed system in particular
circumstances.
Consider the following generic use case UC1:

  UC1-1. User submit command C1: CreateUser(name="Bob",
email="bob@foo.com")
  UC1-2. C1 is dispatched to the pertaining handler
  UC1-3. Command handling logic raise event E1: UserCreated(name="Bob",
email="bob@foo.com")
  UC1-4. E1 is dispatched
  UC1-5 Event handling logic submit a new command C2:
NotifyUser(email="bob@foo.com", ...)

This use case, developed in a single instance scenario, is depicted in
the following picture: http://bit.ly/pFKNpI

Now suppose to have a distributed scenario, with 2 instances, named A
and B for semplicity,
In this scenario the Command Bus is backed by a load balancing
Distributed Queue and the Event Bus is backed by a Distributed Topic.
Now the same use case, renamed UC2, become:

  UC2-1...
  UC2-2 C1 is dispatched to the pertaining handler of instance A (the
instance is determined by the load balancing policy)
  UC2-3...
  UC2-4a E1 is dispatched to instance A
  UC2-4b E1 is dispatched to instance B
  UC2-5a Event handling logic submit a new command C2:
NotifyUser(email="bob@foo.com", ...)
  UC2-5b Event handling logic submit a new command C3:
NotifyUser(email="bob@foo.com", ...)

The following picture depicts this scenario: http://bit.ly/oRAblO

Clearly the command C3, submitted by step 5b, is an undesidered
duplicate.

By the way, there are a lot of other cases when such duplication
doesn't necessarely drive to negative effects but on the contrary is
higly desiderable or even necessary.
Suppose to change step 5 of the previous use case, now renamed UC3, in
this way:

  UC3-5 Update client UI.

Now, in the same distributed scenario of the previous case, each node
can perform the same task and update data displayed on the screen of
local connected clients.

Another use case (UC4) may be the one that change again step 5 in this
way:

  UC4-5 Submit new command C4: ActivateUser(name="Bob", ...)

Again step 5, issued in the same distributed scenario, would result
in a duplicate command, but compared to step UC2-5b, this doesn't lead
to a negative side effect.

To put in a nutshell, in a distributed system, an event dispatched to
an instance cannot be notified to each listener without prior
considering if its exection can lead to an undesidered side effect.
Such undesidered effect cannot occur if the listener business logic is
idempotent (UC4) or locally applied (UC3) , in all the other cases the
event must be ignored if the event itself is not locally raised.

In my opinion, one way to address this constraint could be:

  1. Extend EventHandler method annotation to accept arguments that
describe the idempotence (Yes, No) and, optionally, the scope of the
eventual side effects (Distributed, Local) of the annotated method.
  2. Enrich event metadata with hostname or ipaddress of the originator
instance before publishing
  3. Extend AnnotationEventListenerAdapter to verify whether the event
handled is local or, if remote originated, that target method is
idempotent, before proceeding to method invocation.

What do you think about?

Hi Domenico,

this is a very nice scenario. In fact, I have had discussions a few weeks ago about this exact scenario with some colleagues on another project. They have the problem where some event listeners should process all events, regardless of the machine they are on, while other events should be handled by the handler on either one of the machines.

My first solution is to appy exactly your strategy, by only allowing “local” events to be handled by non-idempotent handlers. However, when a system crashes, you could have events that aren’t handled at all for quite a while.

Then, I decided to look more into the actual problem: some listeners compete with eachother for events, while other should share events. That’s when I decided Axon needed the concept of a “cluster of listeners”. It’s logged in issue #190. It adds two concepts: clusters and a terminal (see why on wikipedia: Axon Terminal). The Terminal is responsible for dispatching Events from the sender to the clusters. Typicall, you would use JMS or AMQP to dispatch events between different machines. A cluster is a group of event listeners that shares certain aspects. Competing clusters (with the side-effect handlers) will compete for messages on a single queue. Non competing clusters will all receive their own copy of each event. Besides the distribution aspect, the clusters allow simpler implementation of, for example, asynchronous handling, transaction management, etc.

The basics have been implemented, and documentation is on its way. I haven’t implemented any JMS or AMQP based terminals yet. Work on those will start a bit later on. You can check it out in the 1.2 snapshot version.

Cheers,

Allard

Hi Domenico,

I’am one of the colleagues :slight_smile: I was busy drawing a nice diagram to showing our problem and solution and answer your email. But, as always, Allard is to quick in answering emails of the mailing list.

You can see it here: In our setup we use one module that handles all commands and sends out events to other modules. The next step will be creating a Distributed Command bus to allow commands to be send to multiple nodes and still guarantee the order of (some) of the commands. Regards, Martin On 09/16/2011 08:50 PM, Allard Buijze wrote:

Hi Domenico,

Another colleague here :), nice description of your problem and an interesting story.

I share your pain, as architect of the solution Martin showed, the easy part was dealing with clusters and load-balancing. The actual challenge is in reliably handling competing consumers or multiple publishers of a command.

One way to deal with the duplicate command for now, might be to add a some sort of correlation id to the command from the retrieved event. Then you would need some sort of tracking in the command handler (or preferably in the consumer) to filter out duplicates.

Good news is that I am starting on the distributed command bus logic, so fingers crossed it should be in the 1.2 release.

Let us know if we can help some more.

Best Regards

Jason

Just some summary info on RabbitMQ just in case you have not seen for the implementation abstraction. Very interesting on how they use ERLANG networking and distribution model natively. I also think spring can help abstract the implementations with SPRING-AMQP, which is where I think you are already going.

Also, could something like this be leverage for Single JVM capability and then blended by config to multi-JVM?

Any thoughts?

http://www.rabbitmq.com/faq.html#shared-session-clustering

What platforms will RabbitMQ run on?

RabbitMQ is developed on Debian Linux and Mac OS X. The broker component is written in Erlang and therefore mostly platform-neutral. See the supported platforms for details.

What is shared session clustering?

For those familiar with typical tiered session/application architectures such as JavaEE, it might help to think of the AMQP exchanges as corresponding to a logical session tier, and AMQP queues as corresponding to a logical application tier. From this point of view, RabbitMQ’s routing tables can be seen as clustered shared session state. RabbitMQ uses the OTP distributed database, Mnesia, to reliably and persistently replicate session state across all nodes in a cluster.

How does clustering work?

RabbitMQ’s current clustering mechanisms are built around Erlang’s native support for reliable distributed programming. This is a deeply sophisticated framework, implemented at the language and virtual-machine level.

Some of the unique points about Erlang’s networking and distribution model are:

  • it scales to hundreds of thousands of parallel processes (“green threads”) within a single virtual machine, bounded only by available memory;
  • the failure model for local inter-process communication (IPC) is the same as that for distributed IPC, making the transition from non-distributed to distributed code very smooth;
  • binary pattern-matching constructs within the language ensure straightforward and efficient translation between wire-level encodings and internal Erlang data structures;
  • finally, the OTP libraries shipped with the Erlang distribution include the notion of a supervisor/worker relationship, where supervisor processes monitor and restart worker processes under their control, making management of the entire process hierarchy within a server deterministic and automatic.

Within an Erlang node cluster, Erlang’s native high-speed messaging is used to provide an efficient way of distributing work across the cluster. Individual AMQP clients connect to machines within the cluster, and Erlang’s distributed routing database routes AMQP messages to the appropriate endpoints.

For more detail on Erlang’s benefits in an AMQP setting, please see this page.

+1 for that solution. That’s the strategy I’ve been using. All my commands have a command identifier which is looked up in registry to make sure that command hadn’t already been handled. Duplicate commands are thus discarded.

Thank you all for the quick and kind responses

@Allard
I’ve checked out trunk from repository and looked into the concepts of Clusters and Terminals that you point to me.
From a design standpoint that make a lot of sense and open the door to a broader range of possibilities. I totally agree.
Even more important, i think, you applied a DDD tenet “Make implicit concepts explicit” and that make things clearer. Bravo!
But on the implementation point of view there are still some aspects that looks blurry to me:

  1. Listener Subscription to EventBus (http://bit.ly/nib8oQ)
    How a cluster selector can determine the proper cluster for each listener?
    I’ve seen that ClusteringEventBus support ClusterSelector injection through a constructor argument.
    So anyone can provide an implementation that contains the proper logic to choose a cluster for each AnnotationEventListenerAdapter target.
    Did you have in mind a thing like this?

if (target.name like ‘foo’ or ‘boo’) {
return MyNonCompetingListenersCluster()
} else {
return MyCompetingListenersCluster()
}

Does ClusterMetaData maybe participate in this task?

  1. Event Publishing (http://bit.ly/nnB4tF)
    I am aware of the role of EventBusTerminal as an adapter to the underlying messaging solution.
    Correct me if i’m wrong, but the key point here is to link each Cluster of competing EventListeners with a different queue.
    How a single Terminal can cope with the different queues specific of each cluster without being overloaded of responsibilities?
    And how a terminal can match the proper channel for each cluster? Again Is ClusterMetaData involved?

Just another question, the latest I swear :slight_smile: What is the expected release date for version 1.2 ?

@Martin
Your nice diagram help me to understand better the concepts explained by Allard. Thank you.

I’ve done a simple CommandBus that wrap a Spring Integration SubscribableChannel and delegate the rest to SimpleCommandBus.
It’s largely inspired by SpringIntegrationEventBus. You can find it here: http://bit.ly/o8UnM5

This bus doesn’t handle message ordering because it’s something specific of underlying messaging solution.
In JMS there are standard properties JMSXGroupID and JMSXGroupSeq (take a closer look 'cause they are tricky)
and each implementation has extended features like BEA Unit-of-Order or ActiveMQ Total Ordering.
But you seem more oriented on AMQP that I don’t know…
Good Luck and let me know

@Jason
Like you I’ve been thinking to a command store (something like this: http://www.eaipatterns.com/MessageStore.html) where the commandbus try to write before dispatching to the handler.
But the problem is more on the event side than on the command side.
Second, filtering duplicate seems tricky cause, I think, you can legitimately reissue an identical command.
But you can’t reissue the same command as a reaction to the same event. So, as you stated, you need to associate the originating event if available.
But how to check on command equality? Since command is a value object you need to implement carefully hashcode, including the eventid, and all the meaningful properties.
I don’t know wich solution is better. At first sight Clusters seem a cleaner and formally correct way to deal with the problem.

Best regards

Domenico

Hi Domenico,

the cluster selection logic is as you described. But in some cases you can preconstruct clusters and return the most suitable instance. You can use meta data to (for example) set the queue or cluster name if you use any.

The EventBusTerminal is the only instance that needs to know about the actual messaging implementation used. For the AMQP implementation, all it needs to do is set up a consumer for each cluster which forwards each incoming message to its cluster. That’s 1 responsibility.

I hope to find the time to release 1.2 during this week. There is still some documentation left to do.

Cheers,

Allard

Hi Allard,

Apache Kafka supports this brilliantly. I also like their naming schema. They talk about consumers and consumer groups. A consumer group negotiates through zookeeper which partition (a label added to each message) goes to which consumer in the group.