Dead-letter Queue

Hello,

Typically with message-broker software like RabbitMQ, you have a Dead Letter Queue to put messages in case you encounter any issues while processing. You can easily reprocess this problem message by moving it back into the source queue.

What if I want something similar with Axon EventHandlers. I know you can add custom behaviour with ListenerInvocationErrorHandler.

Has anyone experience to implement similar functionality as a DLQ? Or is there anything available in AxonFramework itself?

Required functionality:

  1. in case of exception while message processing, put the message apart
  2. have an “easy” way to reprocess this message

Kind regards,
Koen

1 Like

Hi @allardbz ,

I found a reply from you May 2019:

“ Basically, what they do when an event fails, is register that event with a dead letter queue. The value returned by the sequencing policy is remembered and blacklisted. If an event arrives that has the same sequencing policy value as one of the blacklisted values, that event is also appended to the dead letter queue.

One would then need to monitor the dead letter queue and reprocess any events as soon as the issue has been resolved.

We’re currently looking at how we can support this approach from the Framework/Server perspective in a generic way.”

Do you have a generic way in the meantime? I am very curious to hear about the “reprocess” part.

Kind regards,
Koen

Sadly, a full-fledged solution is rather non-trivial. At least from what Axon Framework would optimally provide.

There is a means you could get started with this right now though, although it might be quite some custom code. Regardless, I will give some of our insights on the matter so that you can give it a head start.

To allow for correct dead-letter queuing, Axon requires two things:

  1. It needs to be able to catch the exception, in combination with knowing which event caused the exception.
  2. It needs to be able to understand in which “overall group” the event belongs.

I assume point one is evident, point two might require some explanation.
Point two argues that if an event has failed, this was very likely targeted towards a specific Query Model instance. To ensure event handling maintains the correct ordering for this model, any subsequent events targeted towards this exact model should not be handled at all. Within Axon Framework, the SequencingPolicy decides where an event belongs.

Adding Events to the Dead Letter queue

Now, with that piece of information, we can take a look at the most reasonable spot to provide a custom solution for dead lettering. The one point within the framework which accounts for both these points, is the EventHandlerInvoker. Most straightforward would be to create a custom implementation based on the SimpleEventHandlerInvoker (e.g. DeadLetteringEventHandlerInvoker). Overriding the handle(EventMessage<?>, Segment) would be required. When handling an Exception, it should be added to the dead letter queue. Prior to actually invoking the event handlers, you would have to validate, based on the SequencingPolicy, if there are events with the same value dead-lettered and if they are the event should be added.

Maintaining the Dead Letter queue

Now that we’ve found a means to figure out if events belong or should belong to a/the dead letter queue, we need to think about where to store this information. Typically Axon’s TrackingTokens would be the right position, but “messing” with this wouldn’t be feasible for a custom solution. I would thus recommend to introduce a table inside an RDBMS where you store the dead letter messages in.

You can either do this directly in the DeadLetteringEventHandlerInvoker or you can create a DeadLetterStore/DeadLetterQueue component which encapsulates this storage behavior. It’s this component which would be invoked to validate if there are any events present already for the given sequencing value.

Reevaluating the Dead Letter queue

With the above in place, we can start thinking about when to reevaluate the dead letter queue. There are several clean approaches imaginable, like using a MultiStreamableMessageSource or by looking into adjusting the TrackingEventProcessor to periodically check for dead letter events. However, this is again to farfetched for a custom solution.

I would instead opt (again) to make the DeadLetteringEventHandlerInvoker in charge of reevaluating the events. This should be done per sequencing value (as this dictates a set of events which are targeted towards a given model), and one event per time. Thus don’t handle the entire batch of events within the queue at once. Event Handling failed once, and it might be very likely it would again. Going over the set one event per time is simply the most granular approach to getting back to track here.

The trigger for a reevaluation can come from several angles really. You could have a scheduled task through an ExecutorService which does this periodically. Or, you would introduce an endpoint you can invoke yourself based on the amount of dead-lettered events.

Conclusion

I hope this sheds some light on what your options are @Koen_Verwimp. It definitely sheds some light on our own thoughts to implement this inside the framework itself.
If you happen to have any further questions towards the custom/generic solution, definitely let us know in this thread.

Hi Steven,

Just to let you know we received your reply last week and will start implementing a PoC now.

Thank you for your advice and we keep you posted!

Koen

1 Like

Hi @Steven_van_Beelen

Lets say that we store all the “problem” events as described like you said.

What do you think of configuring a separate eventstore. This eventstore is linked to a separate context DLQ in AxonServer. We can configure this as a second eventstream on the tracking event handler(s).

So in case you want to retry a stored problem message, we publish the message to the DLQ context … and this message is automatically picked up by the eventhandler using his second event source.

We might need to implement a filtering functionality in the EventInvoker to skip events that not need to be retried as they failed in a different processing group (based on metadata stored on the event).

Storing the events in side another context could indeed work. The sole downside of this, is that the source isn’t ephemeral. Differently put, your dead-lettered events are never removed from the source. This would thus require additional logic to keep track of the already handled dead-lettered events. Thus could reside in the TrackingToken of course, but that further begs the question how you would deal with a reset scenario of the processor.

Mind you, I think such a solution is doable. But for a simple implementation it necessitates making some events ephemeral. I am assuming this ephemeral solution to be key to provide a thorough solution from within Axon Framework eventually.

Hi Steven,

Thanks for your reply.

I understand what your mean. I think we proceed with this implementation.

We still hope you can get this functionality on the backlog so we have this out-of-the-box?
I have seen similar questions here before :slight_smile:

Thanks!

Indeed it is on the backlog.
This is the issue around ephemeral events for example. This issue arguably blocks any dead-letter queue implementation, although I assume that to become more apparent once we start on this endeavour ourselves.