Tracking Event Processors

I am struggling to get a handle on tracking event processor configuration and management. I am using the dockerized version of AxonServer (specifically version 4.1 from docker hub) and my project depends on version 4.1.1 of axonframework.

For the sake of an example, I will define a very simple bounded context named foo, as follows:

  • com.myco.foo.command.FooAggregate – Command Handling AggregateRoot
  • com.myco.foo.query.FooProjector – Query Model Projector
  • com.myco.foo.query.Foo – Query Model

FooAggregate handles commands and emits events. FooProjector processes the events and updates the state of Foo. Eventually this query side view model will be consistent with the command side aggregate. This is all very easy to understand.

When I initially deployed my project I didn’t apply any custom event processing configuration and so I just got the defaults that are provided through spring boot auto configuration. As such, the DefaultListenerInvocationErrorHandler is org.axonframework.eventhandling.LoggingErrorHandler. In this mode, if an exception is raised in FooProjector it will be logged and the Event that triggered the error will be skipped (its state will not get projected into Foo). So, by default, when FooProjector has a bug in its processing logic or if FooAggregate has a bug in its event producing/emitting logic and either of those bugs are exposed by a given command execution, then the specific instance of Foo will never become consistent with FooAggregate. That is, unless the failure is detected, the offending bug is fixed, and the query model is re-built (as described in the documentation…essentially throwing away the old view and resetting the token store). This keeps everything moving, but it implies you must be monitoring the logs and alerting when failure is detected so that the manual process of fixing the issue can be handled. This could be a very legitimate approach, but it breaks down when/if your monitoring/alerting breaks down. However, with good testing you can drive the error rate down (close to zero hopefully) and then it’s probably manageable. Maybe this is the way to go, but I can’t decide.

Another approach I’ve tried is to configure the PropagatingErrorHandler. For this I’ve added the following configuration:

`


  @Autowired
  public void configure(EventProcessingConfigurer configurer) {
    configurer.registerDefaultListenerInvocationErrorHandler(c -> PropagatingErrorHandler.INSTANCE);
  }

`

With this configuration, when an exception is raised by an event listener, the tracking processor will go into an error mode where it will continue to retry processing the failing event until it succeeds; it will keep trying forever. There are a couple of problems with this configuration however. The first problem is that it’s very hard to troubleshoot the underlying issue because the error handler doesn’t log any details from the exception that triggered failure. All that comes across in the logs when an event handler fails with this configuration is the following:

`

2019-04-14 10:46:36.974 INFO [-,] 16714 — [xx.pkg.query]-0] o.a.e.TrackingEventProcessor : Fetched token: IndexTrackingToken{globalIndex=455} for segment: Segment[0/0]
2019-04-14 10:46:36.974 INFO [-,] 16714 — [xx.pkg.query]-0] o.a.a.c.event.axon.AxonServerEventStore : open stream: 456
2019-04-14 10:46:43.240 WARN [-,] 16714 — [xx.pkg.query]-0] o.a.e.TrackingEventProcessor : Releasing claim on token and preparing for retry in 4s
2019-04-14 10:46:43.241 INFO [-,] 16714 — [xx.pkg.query]-0] o.a.a.c.u.FlowControllingStreamObserver : Observer stopped

`

From this logging, the token 456 identifies the failing event. I can use it to identify the event in the event store, as every event in the store has a unique token, which appears to be a global sequence number. However, without information from the exception that was raised I don’t know any of the details of the failure. I can fix that with the following configuration:

`

That last question wasn’t clear. I mean to ask what the configuration to increase threads/segments would look like, if that’s even a reasonable approach to take.

Hi Troy,

to prevent flooding of logs with stack traces, the Tracking Processor will log the exception once when starting the “retry mode”. After that, it will just log the fact that it will attempt another retry later. The first entry will mention “Error occurred. Starting retry mode.”, followed by the exception and stacktrace, or WARN level.

You can configure the “initialSegmentCount” in your tracking processor definitions. This will, however, only impact the number of segments when the processor starts for the very first time. Since Axon 4.1, you can dynamically increase or decrease the number of segments. Axon Framework provides methods on the TrackingEventProcessor: split(…) and merge(…) that allow you to increase and decrease, respectively. In AxonServer, there are a + and - icons for a Tracking Processor instance, which will coordinate this effort for you, even across multiple application instances.

One interesting approach I have seen is to use a blacklisting approach where failed events are sent to a error queue. The interesting part of this approach, is that it will also mark the value of the “Sequencing policy” for that message as “blacklisted”, so that any event which should be handled sequentially according to the sequencing policy (which defaults to the identifier for the aggregate that published the event) is also automatically added to the error queue. The tracking processor would also need to regularly retry events on the error queue, to see if the problem has been resolved. This approach allows you to hold on to the order guarantees of events, even in the face of failure.
Right now, this approach takes some custom building. We recently came up with an approach to provide a generic solution for this. Discussions on when this could be implemented have just started…

Let me know if this makes sense.

That does make sense. Thanks! I will need to think about how to approach the implementation and will probably have some questions when I get to it. I probably won’t get started for a while though, maybe your solution will be available by then?

I will have to try again, but I did try to use the controls on the axon server ui and they didn’t seem to do anything. I’ll report back.

Cheers!

So I’ve tried again to use axon server to increase the number of segments but it just spins. And I don’t get anything in the logs either.

See the attached screenshot of the tracking processor just spinning after I hit the + icon.

Also, the logs don’t give me any help:

`


2019-04-18 18:44:04.730  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@72f7f4f, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:45:04.744  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@5d9c9593, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:46:04.761  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@23f14d40, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:47:04.773  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@60cf6d4c, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:47:58.003  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@e1bbf0a, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:47:59.017  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@6ce54791, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:48:01.031  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@2622bd0, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:48:05.035  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@4c955f92, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:48:13.048  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@14f3ce5f, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:48:29.060  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@76c0aec, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:49:01.073  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@45f14d0, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:50:01.109  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@15343aab, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:51:01.132  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@2e059f1d, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:52:01.145  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@69749c62, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:53:01.159  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@70dc061, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:54:01.172  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@1af037ab, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:55:01.182  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@2ac1581d, client='29204@piehole', context='default', lastToken=91}

2019-04-18 18:56:01.208  WARN 7 --- [-worker-ELG-3-5] i.a.a.message.event.EventDispatcher      : Remove tracker info EventTrackerInfo{responseObserver=io.axoniq.axonserver.message.event.ForwardingStreamObserver@44ca882d, client='29204@piehole', context='default', lastToken=91}

`

What am I missing?

Screenshot from 2019-04-18 12-50-32.png

Hi Troy,

are you using 4.1 in both the Server and Framework? Server 4.1 allows for sending the instruction to the Axon Framework instances, but they require Axon Framework 4.1 in order to understand that instruction.

Cheers,

Yes, I am using the axon server docker image version 4.1 and my application depends on version 4.1.1 of axon framework.

Hi Troy,

there aren’t many things I can think of that could go wrong, actually. I would expect something to show up in the logs of your application if they fail to execute the instruction.
Are you, by any chance using the MongoTokenStore? There was an issue that prevented it from supporting split and merge, which was resolved in 4.1.1 of the axon-mongo extension.

If all dependencies seem in order, you could try setting a breakpoint in the splitSegment or mergeSegment methods in the TrackingEventProcessor. This is where the instructions are scheduled. Lastly, the processInstructions(int) method is where the instructions are executed (by the thread responsible for that specific segment).

Cheers,

Hi Allard,

I am using the MongoTokenStore. I upgraded to the 4.1.1 release and now I can increase the segments. One thing that I’ve noticed however is that when a segment is “stuck” (because an error handler is throwing an exception and I am using a propagating error handling strategy) it will still have messages routed to it. These messages then never get processed because the “stuck” message is blocking them.

While I agree and have followed the advice of not throwing exception from exception handlers, it is still possible that the handlers will raise an exception (because they have a bug…and this is my concern here). So I really like your idea of using a propagating error handler that can retry and then blacklist events that continue to fail after a certain threshold. However, without that sort of mechanism I’d think it would be a big mistake to use a propagating error handler because I find it very difficult to recover from this failure in a running system. I’m sure others are dealing with this sort of thing. But how? Are people building UIs that expose the state of the tracking processors and enable admins to “unstick” them? Are people just not using propagating error handlers and instead monitoring their logs for failure?

Hi Troy,

we’ve recently has some interesting review sessions at some customers, where they used a blacklisting approach that allowed them to block processing of failed events and the events that need to be processed sequentially with those events, while processing of other events would continue normally.

Basically, what they do when an event fails, is register that event with a dead letter queue. The value returned by the sequencing policy is remembered and blacklisted. If an event arrives that has the same sequencing policy value as one of the blacklisted values, that event is also appended to the dead letter queue.

One would then need to monitor the dead letter queue and reprocess any events as soon as the issue has been resolved.

We’re currently looking at how we can support this approach from the Framework/Server perspective in a generic way.

Hope this helps.
Cheers,

Allard

Hi Allard,

So I guess the absence of generic support for the dead letter queue approach means that there is no way for me to create a dead letter queue on the axon server. Is that correct? If so, what would you use and/or recommend for the queue? Also, how would I plumb access to the sequencing policy associated with a given event so that I could use it in my custom error handler?

I will be implementing org.axonframework.eventhandling.ListenerInvocationErrorHandler#onError(Exception, EventMessage<?>, EventMessageHandler)

I’m also looking for documentation or examples of implementing a custom ListenerInvocationErrorHandler. For example, I wonder if I am supposed to use org.axonframework.eventhandling.EventMessageHandler#handle() in order to retry the event? Presently my implementation re-throws certain exceptions, when a re-try is desired, and logs others. I’m pretty sure the documentation indicates this is what you’re supposed to do, but I wonder why I’m give the EventMessageHandler parameter in the onError() method. I’m just not sure what the right/best way is and would love some guidance. :slight_smile:

Cheers!

Hi again,

At a very high level, this is how I think I would attack this problem.

  1. I register a custom ListenerInvocationErrorHandler
  2. register a custom MessageHandlerInterceptor
  3. implement a dead letter queue (see previous question about this part)
  4. Define a persistent entity to keep track of event message identifier, targeted event handler type, retry attempts.
  5. Define another persistent entity to keep track of black listed sequencing policy values paired with a targeted event handler type

The custom ListenerInvocationErrorHandler will be responsible for keeping track of retries for a given event message that is processed by a given event handler. Also, when retries are exhausted this handler will “blacklist” the sequencing policy value for the given event handler type and will deliver the event message to the dead letter queue.

The MessageHandlerInterceptor will then be responsible for checking if the sequencing policy value for the given event message has been blacklisted for the given message handler type. If it has been blacklisted then the handler will not proceed() but will instead deliver the event message to the dead letter queue.

Is this even close to how you’d do it? If so, I still have a bunch of questions and will need so pointers to documentation, classes/javadoc and/or examples if at all possible.

Thanks and Cheers!

I have started to implement an approach for the dead letter queue in a sample project I have on github: https://github.com/troyhart/yardsale

I haven’t figured out exactly how I am going to approach the actual dead letter queue, however, I have implemented the ListenerInvocationErrorHandler with: https://github.com/troyhart/yardsale/blob/master/core-axon-eventhandling/src/main/java/com/myco/axon/eventhandling/ErrorHandler.java

This custom handler defines two persistent entities used to handle bookkeeping. The problem is that I can’t seem to isolate the transaction and it is getting rolled back–I’m trying to persist retry attempts for a given event message so that I can retry a few times before blacklisting the sequence.

I’m sure I have a number of issues with the implementation and would really appreciate some pointers.

Thanks!

OK, so I solved the rollback issue by setting the bookkeeping transaction propagation strategy to: REQUIRES_NEW

The biggest thing now is the dead letter queue… Does anyone have any pointers to offer here?

Also, is the MessageHandlerInterceptor.handle() method the right place to check for blacklisted sequences and then either InterceptorChain.proceed() or deliver the event to the dead letter queue as appropriate?

Looking at org.axonframework.messaging.MessageDispatchInterceptor and org.axonframework.messaging.MessageHandlerInterceptor I don’t see how I could use either one to determine if the given Event is blacklisted.

In order to make this determination I need to know the sequencing value and the event handler type/class. As for the sequencing value I can’t use org.axonframework.eventhandling.async.SequentialPerAggregatePolicy.getSequenceIdentifierFor(EventMessage) because I don’t have a reference to an EventMessage, rather I just have the Event. However, while may not be ideal, I know empirically that the sequencing value will be the aggregate identifier, which I can get directly from the Event (but it still feels a little dirty). However, the real issue is that I don’t know how to determine the needed event handler type/class.

References:

  1. sample project root: https://github.com/troyhart/yardsale
  2. axon configuration: https://github.com/troyhart/yardsale/tree/master/app/src/main/java/com/myco/axon/config
  3. registered listener invocation error handler module: https://github.com/troyhart/yardsale/tree/master/core-axon-eventhandling

So I have an implementation that works, except id doesn’t really deliver messages to a dead letter queue, instead it just logs the EventMessage. I will need to come back to this later.

I was not able to use either of the interceptors because I couldn’t determine the processing group from the interceptor context. So rather, I created a mechanism for event handlers to “opt-in” to quarantining black listed events. I created the interface com.myco.axon.eventhandling.BlacklistAware which has a default method that will throw a com.myco.axon.eventhandling.EventQuarantinedException if the event being processed is blacklisted for the processor.

References:

  1. sample project root: https://github.com/troyhart/yardsale
  2. axon configuration: https://github.com/troyhart/yardsale/blob/master/app/src/main/java/com/myco/AxonConfig.java
  3. black list aware event processor: https://github.com/troyhart/yardsale/blob/master/user-query/src/main/java/com/myco/user/query/UserProfileProjector.java
  4. blacklisting error handler: https://github.com/troyhart/yardsale/blob/master/core-axon-eventhandling/src/main/java/com/myco/axon/eventhandling/ErrorHandler.java

Hi Troy,

I would say that you’re roughly in the right direction for implementing this. We haven’t had the time to discuss the full approach internally, yet, so I can’t provide a detailed answer for you at this point.

One thing that we haven’t yet figured out, is the most reliable way to reconsume messages that have been blacklisted. Most likely, we’ll be using the MultiMessageSource that will make its appearance in 4.2. In that case, one source will check for messages from the main stream. The second source will deliver messages from the blacklist, as soon as they’re ready for (re)delivery.

Also note that you could also use a separate thread to process the events. Because of the blacklisting, there will be no conflicting updates from other threads.

We’re working hard on getting the 4.2 release of Framework and Server out. As soon as that’s done, we’re starting some design sessions for larger elements in the next release. There’s a reasonable chance this will make it into 4.3. At least we’ll have some better idea on how the pieces would fit together.

Hope this helps.
Cheers,

Allard Buijze
CTO

E: allard.buijze@axoniq.io

T: +31 6 34 73 99 89

Thanks for the info Allard. I have added some features to my implementation. Including honoring the location transparency promises offered by the framework by exposing a query for the blacklisted records and implementing a query handler for the model. You can see this change in com.myco.axon.eventhandling.errors.BlacklistedEventSequence where I’m now exposing the QueryGateway rather than the repository. I am also now raising a non domain event (EventSequenceBlacklisted) when processor sequence is blacklisted. Any processor can listen for this event, filter for when it’s relevant based on the processor’s type, and take some action to notify of the projection failure. In my case, I am setting a status on the query model to show that a projection error has occurred… Anyway, I think it’s a fairly reasonable implementation of the blacklisting side of the error handling feature. I do still need to work out the dead letter handling and I will look forward to the features you will be adding to the framework to help support this.

Cheers.

Troy