Issue 49: Add support for guaranteed event processing in asynchronous event handler

Is there any progress on this issue?

We are currently processing events asynchronously and have experienced
failures in testing and have had to clear out the database and start
again. What are others doing for their production systems when a
domain event is dropped?

Given that domain events are often processed asynchronously by the
query models, are there any solutions available regarding rebuilding
the query model from the domain event stream after the occurance of a
failure? The scenario I see occuring is that the system runs for a
while, then a domain event fails, we make a fix redeploy and the
quiery model is out of date.

The issue mentions using a connector to MQ products. We are currently
using ActiveMQ and are considering creating and implementation of the
EventBus backed by a topic or maybe a collection of queues. Any advice
on the granularity of the queues/connections to the topic? One per
event handler seem over the top.

Hi,

do you have any idea where the missing events come from? Is due to a restart?

You are right about the MQ as being a secure replacement for asynchronous event handling. In JMS, you’d use a Topic, and in AMQP a queue per listener (group). I don’t think a connection per event handler is a bad thing. It (of course) also depends on the number of event listeners you have. Alternatively, you could create a group of event handlers that share a single connection. When an event comes in, all handlers in the group are notified sequentially, after which the message is ACKed.

I’ll start thinking about a way to make this easier…

Cheers,

Allard

Its not a restart issue but I can see how that could be problem if the
handler stop before the command. I intend to put events on the topic
as part of the transaction. Mainly coding errors found in our early
testing though I think the issue there is alway a chance of something
bad slipping into production.

The eventhandler group sounds like a good idea. We came to similar
conclusion ourselves and were considering putting in a custom
@EventHandlerGroup annotation with some type of factory to create an
id or even just using a logical name of the querymodel to start with.
I am a little worried about the impact on Saga and other aggregates
listening to the event stream but I guess they are just other
eventlisteners in the end.

Hi,

I am a little worried about the impact on Saga and other aggregates
listening to the event stream but I guess they are just other
eventlisteners in the end.

You’re right. Saga’s are event listeners like any other. Well, almost, the saga manager is the event listener, and it delegates events to the associated sagas. So I don’t think you’d need to worry about them in this case.

You’re approach to the @EventHandlerGroup sounds good. Perhaps we could add a “name” parameter in which you can configure the group name. There would have to be a group manager, which reads from a source (e.g. queue) and passes them sequentially to the listeners in the group. This is just thinking out loud. If you have any other idea, I’m eager to know about it.

Cheers,

Allard

Hi,

In the "Meta CQRS" project I'm currently working on I created an admin
tool that re-creates the queries by simply replaying the events. A
nice Blog article on this aspect can be found here:
http://abdullin.com/journal/2011/1/19/scalable-and-simple-cqrs-views-in-the-cloud.html

The second thing is that I use domain events only on the command
server. A listener translates the events into JMS messages and the JMS
transaction should be the same as used for storing the event in the
event store. This way you can be sure that no message will be fired if
the database transaction fails that stores the event.

Another idea by Greg Young is to "poll" the event store and send
messages (events) only if new entries are found. I'm not sure if this
can be done with the Axon Framework.

Cheers,
Michael

Hi,

automated regeneration of query models (triggered by JMX, for example) has been on my mind more than once. But since it occurs quite rarely (as the Rinat also states in his blog), I didn’t give it priority yet. Michael, if you have something, perhaps we could include it in the Axon core?

Polling for events is another strategy that one could use. Epecially if the “delay” of a query model is not really important, it could be a good approach. Currently, none of the event stores in Axon supports this, but it is easily added when required.

Cheers,

Allard

I think regenerating query model will help in alot when adding new query model from existing events over the lifetime of the system. Is this seperate to a MQ backed event bus though? I have thought that it might be nice that if you add a new query model and it registers with the event bus it could recieve all historical events.

As for the polling of domian events. Why not just publish to the queue/topic when the db transaction commit. Spring has support for this (TransactionAwareConnectionFactory), though i guess it still relies on the connection to the mq being available. Maybe an XA transaction across the db and mq could work but it seems a little heavy weight.

Given there is still the potential for losing a domain event rebuilding the query model could still be a solution in the event of a failure. One thing that still worries me is domain aggregates that listen to the event from other domain aggregates and use these event to contribute to thier internal state. This is a pattern I have read about on the DDD/CQRS user group though have not used as yet? If a domain event from aggregate1 is committed to the db and then cannot be put onto the queue/topic then it wont be delivered to aggregate2. Any ideas on this?

Hi,

Actually I kept rebuilding the View (Query) side simple and just use
"EventStoreManagement" that directly calls the Event Listeners that
build the View (Query) side. No event or message bus, just plain
calling the listeners. Only problem you need to solve is how to
collect the event listeners and wire them correctly together with DAOs
etc.

I'll post some example code soon.

Regards,
Michael

See my response inline…

I think regenerating query model will help in alot when adding new query model from existing events over the lifetime of the system. Is this seperate to a MQ backed event bus though? I have thought that it might be nice that if you add a new query model and it registers with the event bus it could recieve all historical events.

When adding new query models, you should feed them with initial data. This data can either come from another query model, or from the events. Using events is probably easier. The most straightforward way is to create a small component that reads all events from the Event Store (see EventStoreManagement interface) and passes them directly to the Event Listener component. The component Michael is talking about will allow you to automate this, removing the need for a custom component.

As for the polling of domian events. Why not just publish to the queue/topic when the db transaction commit. Spring has support for this (TransactionAwareConnectionFactory), though i guess it still relies on the connection to the mq being available. Maybe an XA transaction across the db and mq could work but it seems a little heavy weight.

Polling is just an alternative to using MQ for publishing. It really depends on scenario which one suits you best. he advantage of polling is that you don’t need any special transaction management (2 phase commit, e.g.). It is also very crash-resilient. It will simply go on where it left off after a crash, assuming it is able to find out which event was last handled.

Given there is still the potential for losing a domain event rebuilding the query model could still be a solution in the event of a failure. One thing that still worries me is domain aggregates that listen to the event from other domain aggregates and use these event to contribute to thier internal state. This is a pattern I have read about on the DDD/CQRS user group though have not used as yet? If a domain event from aggregate1 is committed to the db and then cannot be put onto the queue/topic then it wont be delivered to aggregate2. Any ideas on this?

Aggregates never react to event from other aggregates. That is, not directly. If there is a relationship between aggregates, you’d use either a Saga or a simple Event Listener. Its job is to translate one aggregate’s event into another aggregate’s command. The @EventHandler methods inside an aggregate are used exclusively by the “apply(DomainEvent)” method inside that same aggregate. Do note that an aggregate may consist of more than a single entity. Besides the aggregate root, you could have annotated entities. In such case, each entity within an aggregate receives all of the events applied in any of the entities in the aggregate.

Hi all,

I added some example code from the Meta CQRS project that shows how to
recreate views:

http://code.google.com/p/axon-auction-example/source/browse/www/tmp/

It's not a complete example, just some classes to give you an idea of
how to do something like that.

Hope it helps.

Cheers,
Michael

I agree that aggregate should not react to event or other aggregates directly but by using an event handler/saga and dispatching a command to modify an aggregate you still need to be guarenteed the the event handler will handle the event. I guess the polling solution is the simplest. I suspect a combintation of an eventlistener and polling would be good solution. Send out the event immediately if possible, if that fails poll for what was missed when the queue connection is available again.

Hi,

This is an interesting discussion, but I have some
difficulty in understanding the need for polling
in combination with the transaction.

It seems to me the best (and perhaps the only) place to
call the Begin/Commit or Begin/Roolback transaction
routines is from within the handler of the original
incoming command, no matter how many other
internally-generated commands are involved.

If anything goes wrong, the whole enchilada, including
the database of the message agent/broker are rolled-back
and remain in good shape.

I understand that polling can be an alternative for
having a heavy-weight distributed transaction.

I also understand that sagas are a different issue.
If something goes wrong with the work-flow of a saga
a few days down the line (for example) some human
involvement becomes necessary -- but the DBs remain
consistent.

Perhaps I'm missing something specific to the
knitty-gritty details of CQRS or Axon, so any
clarifications would be appreciated.

/ES

Hi Esfand,

for me, polling is rather an alternative to a transactional message queue. Once a message is on a queue, it is guaranteed to be delivered, at some point in time. So an Event Listener that connects with a queue, will receive whatever it hasn’t processed yet.

Instead of using XA transactions, you could use normal transactions, committing the Event Store first, and other resources (Event Bus) later. On system start, the event listeners could poll for unprocessed events once to make sure any events that might not have been published correctly are also processed. Shea, I suppose this is the mechanism you were referring to?

Cheers,

Allard

Yes that is it Allard. Any ideas on how the event publisher could keep track of where it is up to? maybe the id of the domain event table.

That’s the difficult/interesting part. I’ll have to rest it for a little while. I’m sure it can be done, somehow. Either by keeping track of ID’s or by asking for the events starting at x seconds prior to the last received event. You’d have duplicates, but given the lack of guarantees about event ordering, it might be safer.

Any ideas are welcome.

Cheers,

Allard

Hi Allard,

Please read in-line:

for me, polling is rather an alternative to a
transactional message queue.

Righ on! I assume by "Transactional Message Queue" you mean
a message broker which can not participate in a distributed
transaction (XA Transaction), or it does provide distributed
transaction capability but we don't want to use it. I think
it MUST provid local transaction (non-XA transaction) one way
or another, otherwise it may end-up with inconsistencies in
its own datastore. I mean during the course of processing
one incoming command, the message agent/broker i.e. RabbitMQ
or ActiveMQ) may need to write many messages to its datastore
and commit or rollback them as one atomic operation.
Do we agree on this?

Once a message is on a queue, it is guaranteed to be delivered,
at some point in time. So an Event Listener that connects
with a queue, will receive whatever it hasn't processed yet.

Exactly!

Instead of using XA transactions, you could use normal
transactions, committing the Event Store first, and other
resources (Event Bus) later.

Correct, but let's dig a bit deeper. In general it is not
possible to avoid using an XA transaction,
but there is an exception to this rule which makes it possible
in certain circumstances. When there are N local transaction
managers are involved in a top-level transaction and all of
them are XA-capable except one, we can perform the commit/rollback of
the one which doesn't support distributed transaction (XA-transaction)
as the last and commit or rollback depending on the result of
the distributed transaction. In turn, the special case to this is
when only two local transactional systems are involved. In this
case obviously no XA-transaction is needed at all. I assume this is
the case you are referring to. This means that both the command-bus
and the event-bus are managed by one and the same messaging
agent/broker such as RabbitMQ, and the event-store and every other
involved databases/datastores are managed by one and the same DBMS,
such as a MySQL server instance. In this case as you mentioned, no
XA-transaction is necessary.

On system start, the event listeners could poll for unprocessed
events once to make sure any events that might not have been
published correctly are also processed.

So far, I was %100 with you, but I need your help to understand this
part. Assuming the above scheme:
How can an event not be published correctly? (void of having a bug.)
Do you mean the event-listener should poll the messaging broker/agent
for the existence of an event in the queue? If so, how does it helps
with respect to an event that is not published correctly.

I understand polling in this context as an alternative to a
blocking call to the MQ Agent (as opposed to having a transaction).
But I think one way or the other (polling or blocking call) upon
existence of an event, a transaction is needed to remove the
event from the queue -- assuming that the event-listening system is
designed as a transactional system.

So, please help me understand this part. I confess I haven't read
the referenced article in the thread regarding polling for events,
but I will the first chance I get.

/ES

Any ideas on how the event publisher could keep track
of where it is up to? maybe the id of the domain event table.

OK. Reading the Shea's post replying to the same Allard's post I was
replying answered some of my questions.

Therefore the goal is to catch missing or duplicate events.

In this respect:
*) The missing duplicate can happen at any time, not just
   at the event-listening system's start-up time.
*) The Messaging agent can be queried using polling or
   blocking calls alike as far as catching problems are
   concerned.

Still my questions remain:
*) Following the described scheme, how can we end-up with
   a missing or a duplicate event, short of having bugs?
*) Is it really necessary (or even possible) to implement
   a feature for catching the presence of bugs? A bug
   may cause corrupted fields etc. over and beyond missing
   or duplicate events.

/ES

The issue I see is not that you need to handle cases for all bugs more that every message handler should be guaranteed delivery of a message based on its desired quality of service ie once only. If there is a coding error handling the domain event then it has to be fixed somehow to catch up to the event steam. For the query model you can fix the code and rebuild it somehow. For other event handlers that take an event and convert it into a command that then update a secondary aggregate if the processing of the event fails the second aggregate with remain out of step with the other query models and aggregates with the system. This may not be a problem in every case but when you introduce async processing of event with different transactional boundaries the chances of this happening increase. Persistent messages in jms give you the once only delivery guarantee but getting the domain events onto the queue/topic in a fail safe way is the problem. Also right now axon does not have an event bus that provides the guaranteed delivery.

I got it now! It is indeed a difficult problem to solve
especially with a non-transactional event-bus.

I was under the (false) impression that the event-bus
and the command-bus can optionally be queues/topics
managed by an MQ broker/agent such as ActiveMQ.

Also, I had assumed that we can, at least manually, take
care of the transaction boundaries by issuing Begin/End
transaction calls from within the command handler of the
(externally) incoming command. And therefore establish
a top-level transaction -- encompassing all the affected
datastores during the lifetime of that high-level
transaction.

Thanks a lot for your clear explanations.

Cheers, /ES

Well, don’t give up your impressions just yet. Shea is right about the asynchronous event handler (the one created by annotating the handler with @Asynchronous) doesn’t guarantee processing. If the machine shuts down while events are in a queue, those messages are discarded. I don’t see any other way how events can get “lost” (other than bugs and non-transient exceptions). If a database connection fails, for example, an automatic retry mechanism will buffer events and retry each x seconds.

Instead of using the SimpleEventBus, simply build a JMSEventBus or AMQPEventBus (which a colleague of mine had done, and will be incorporated in Axon soon) that sends messages to a message queue. A UnitOfWork Listener could simply commit these messages when the unit of work is committed. This article describes how XA transactions can be avoided with only a minimal extra risk (be aware that XA transactions are expensive and not 100% safe either): http://www.javaworld.com/javaworld/jw-01-2009/jw-01-spring-transactions.html

Axon’s current event bus (SimpleEventBus) does guarantee delivery. It’s the event handler itself (if wrapped with the asynchronous proxy) that doesn’t guarantee processing of all delivered events. That’s because it uses in-memory queues to buffer incoming events. Using a message broker can help in the cases where guaranteed asynchronous processing is required, since the queues can be made persistent.

Cheers,

Allard