Dead-letter Queue

Hello,

Typically with message-broker software like RabbitMQ, you have a Dead Letter Queue to put messages in case you encounter any issues while processing. You can easily reprocess this problem message by moving it back into the source queue.

What if I want something similar with Axon EventHandlers. I know you can add custom behaviour with ListenerInvocationErrorHandler.

Has anyone experience to implement similar functionality as a DLQ? Or is there anything available in AxonFramework itself?

Required functionality:

  1. in case of exception while message processing, put the message apart
  2. have an “easy” way to reprocess this message

Kind regards,
Koen

1 Like

Hi @allardbz ,

I found a reply from you May 2019:

“Basically, what they do when an event fails, is register that event with a dead letter queue. The value returned by the sequencing policy is remembered and blacklisted. If an event arrives that has the same sequencing policy value as one of the blacklisted values, that event is also appended to the dead letter queue.

One would then need to monitor the dead letter queue and reprocess any events as soon as the issue has been resolved.

We’re currently looking at how we can support this approach from the Framework/Server perspective in a generic way.”

Do you have a generic way in the meantime? I am very curious to hear about the “reprocess” part.

Kind regards,
Koen

Sadly, a full-fledged solution is rather non-trivial. At least from what Axon Framework would optimally provide.

There is a means you could get started with this right now though, although it might be quite some custom code. Regardless, I will give some of our insights on the matter so that you can give it a head start.

To allow for correct dead-letter queuing, Axon requires two things:

  1. It needs to be able to catch the exception, in combination with knowing which event caused the exception.
  2. It needs to be able to understand in which “overall group” the event belongs.

I assume point one is evident, point two might require some explanation.
Point two argues that if an event has failed, this was very likely targeted towards a specific Query Model instance. To ensure event handling maintains the correct ordering for this model, any subsequent events targeted towards this exact model should not be handled at all. Within Axon Framework, the SequencingPolicy decides where an event belongs.

Adding Events to the Dead Letter queue

Now, with that piece of information, we can take a look at the most reasonable spot to provide a custom solution for dead lettering. The one point within the framework which accounts for both these points, is the EventHandlerInvoker. Most straightforward would be to create a custom implementation based on the SimpleEventHandlerInvoker (e.g. DeadLetteringEventHandlerInvoker). Overriding the handle(EventMessage<?>, Segment) would be required. When handling an Exception, it should be added to the dead letter queue. Prior to actually invoking the event handlers, you would have to validate, based on the SequencingPolicy, if there are events with the same value dead-lettered and if they are the event should be added.

Maintaining the Dead Letter queue

Now that we’ve found a means to figure out if events belong or should belong to a/the dead letter queue, we need to think about where to store this information. Typically Axon’s TrackingTokens would be the right position, but “messing” with this wouldn’t be feasible for a custom solution. I would thus recommend to introduce a table inside an RDBMS where you store the dead letter messages in.

You can either do this directly in the DeadLetteringEventHandlerInvoker or you can create a DeadLetterStore/DeadLetterQueue component which encapsulates this storage behavior. It’s this component which would be invoked to validate if there are any events present already for the given sequencing value.

Reevaluating the Dead Letter queue

With the above in place, we can start thinking about when to reevaluate the dead letter queue. There are several clean approaches imaginable, like using a MultiStreamableMessageSource or by looking into adjusting the TrackingEventProcessor to periodically check for dead letter events. However, this is again to farfetched for a custom solution.

I would instead opt (again) to make the DeadLetteringEventHandlerInvoker in charge of reevaluating the events. This should be done per sequencing value (as this dictates a set of events which are targeted towards a given model), and one event per time. Thus don’t handle the entire batch of events within the queue at once. Event Handling failed once, and it might be very likely it would again. Going over the set one event per time is simply the most granular approach to getting back to track here.

The trigger for a reevaluation can come from several angles really. You could have a scheduled task through an ExecutorService which does this periodically. Or, you would introduce an endpoint you can invoke yourself based on the amount of dead-lettered events.

Conclusion

I hope this sheds some light on what your options are @Koen_Verwimp. It definitely sheds some light on our own thoughts to implement this inside the framework itself.
If you happen to have any further questions towards the custom/generic solution, definitely let us know in this thread.

1 Like

Hi Steven,

Just to let you know we received your reply last week and will start implementing a PoC now.

Thank you for your advice and we keep you posted!

Koen

1 Like

Hi @Steven_van_Beelen

Lets say that we store all the “problem” events as described like you said.

What do you think of configuring a separate eventstore. This eventstore is linked to a separate context DLQ in AxonServer. We can configure this as a second eventstream on the tracking event handler(s).

So in case you want to retry a stored problem message, we publish the message to the DLQ context … and this message is automatically picked up by the eventhandler using his second event source.

We might need to implement a filtering functionality in the EventInvoker to skip events that not need to be retried as they failed in a different processing group (based on metadata stored on the event).

Storing the events in side another context could indeed work. The sole downside of this, is that the source isn’t ephemeral. Differently put, your dead-lettered events are never removed from the source. This would thus require additional logic to keep track of the already handled dead-lettered events. Thus could reside in the TrackingToken of course, but that further begs the question how you would deal with a reset scenario of the processor.

Mind you, I think such a solution is doable. But for a simple implementation it necessitates making some events ephemeral. I am assuming this ephemeral solution to be key to provide a thorough solution from within Axon Framework eventually.

Hi Steven,

Thanks for your reply.

I understand what your mean. I think we proceed with this implementation.

We still hope you can get this functionality on the backlog so we have this out-of-the-box?
I have seen similar questions here before :slight_smile:

Thanks!

Indeed it is on the backlog.
This is the issue around ephemeral events for example. This issue arguably blocks any dead-letter queue implementation, although I assume that to become more apparent once we start on this endeavour ourselves.

Hi.

Just to share a bit of how we implemented something like this in our project.
It is a quiet complex thing with different parts.

Part 1 - Metadata:
We have a MessageHandlerInterceptor that adds the processingGroup to the metadata of the event so we know in what processingGroup we are.

We have a correlationDataInterceptor that adds a rootMessageId to the metadata which is the messageId that of the original message. We propagete this to spinned of events so we know what was the original messageId.

Part 2 - ListenerInvocationErrorHandler:
We have a ListenerInvocationErrorHandler that under certain circumstances will store an event as corrupt.
Our conditions are:

  • Sequencing policy cannot be sequential
  • The event must have been seen in the errorHandler at least two times (we do want some natural retries)
  • We must be in a batchingUnitOfWork

If these conditions are met we store the events together with the sequencing policy type, the sequencing policy id and the processingGroupName

Even if we register the events as corrupt we will still throw the error so the batch is retried.

Part 3 - DLQ events:
We have a MessageHandlerIntercetpor that will check if an event that passes is related to a corrupt event. This is done using the processingGroupName, the sequencingPolicyId and the sequencingPolicyType.
If it is related, this event is written to a DLQ table we have and the MessageHandlerInterceptor chain is interrupted for that event effectively ignoring it.
(This will DLQ the original event as well since we did a retry)

Part 4 - Replaying
We have a rest-controller which we can invoke that will retrieve the DLQ events for a certain combination and will redeliver them to the correct eventhandlers.
If everything is succesful the corrupt events and dlq get cleaned up.

Part 5 - Monitoring
We use sysdig in our application and have written a metric that gives us alerts on the number of corrupt events we have.

I cannot share the code we wrote for this because it is our clients property but I can say that it took a lot of iterations to get this right.
It is worth the investment though. Nothing is worse than a eventprocessor that is in an eternal retry loop until you can deploy a new version on a production system.

I hope this can give you some inspiration.

4 Likes

Hi @tbriers
Few queries regarding your approach…

How are you identifiying the processingGroup inMessageHandlerInterceptor ?

How are you maintaining the retry history in ListenerInvocationErrorHandler ?

Do you mean to republish the event? This will cause duplicate events in the eventstore. If not then how are you invoking specific eventHandler in a processingGroup.

Regards,
Roy

  1. For each processing group we register its own handler which gets the name through the constructor
    eventProcessingConfigurer.registerHandlerInterceptor(name, configuration -> new ProcessingGroupInterceptor<>(name));
  2. Just an in memory map. This is not ideal but the cheapest way.
  3. We do not republish it. We redeliver it. When you get your hands on the trackingEventProcessor you can use its eventHandlerInvoker
Optional<TrackingEventProcessor> trackingEventProcessor =
                eventProcessingConfiguration.eventProcessorByProcessingGroup(failedEvent.getProcessingGroup(),
                    TrackingEventProcessor.class);
EventHandlerInvoker eventHandlerInvoker = trackingEventProcessor.get().eventHandlerInvoker()

With that you can use the canHandle and handle methods to redeliver the message in a unit of work

1 Like