Asynchronous Event Handlers?

Hi all,

I am attempting to build out some view projections using some event listeners. My service is Springboot, and I am attempting to use spring data’s reactive mongo repositories to make nonblocking calls to my repo’s. This results in the databases returning Mono and Flux. My issue is that when the events are processed, they do not wait for one another to complete - the handler returns immediately after it subscribes to the Mono. This causes a race condition when multiple events are emitted at one time.

We also have a process in place to rebuild all view projections. This works by dropping the collections for an aggregate and recreating them from the domain events (we manually push the messages out to the event processors). This has the same problem as before, since the process is not blocked in any way.

Any tips here? I could obviously use the non-reactive version of Spring data, and everything would work fine. But it seems to me that there should be a solution to what I am trying to do.

Hello Oliver! Thanks for posting your question.

The current nature of event handlers is that they should be blocking. This is for two reasons:

  1. It’s so the UnitOfWork knows that the transaction is done and can be committed (or rolled back if one of the events in the batch failed).
  2. It’s so the processing can move to the next event. If the events are in the same segment of an event processor they should be handled sequentially and be prevented from running in parallel.

Especially reason 2 is very important here. Say you have 3 events of aggregate A that are order-specific. For example:

  1. Product A added to basket
  2. Product B added to basket
  3. Product A removed from basket

If these events would be handled concurrently (like with non-blocking reactive calls) step 2 and 3 might take place before step one. In this case, this would result in the basket containing product A and B, instead of only B.

For Axon Framework 5 we are planning to fully support the reactive style. So in this case, making sure the Flux or Mono returned from an event handler completes before moving to the next one. Currently, we have no such thing in place.

I hope this clears this up and gives some outlook for the future. For now, please block the calls in the event handlers.

As for rebuilding the projections, I got curious. How did you build this? Do you use the normal way of resetting the event processors or did you build something custom yourself? It would be very cool if you shared this. Thank you!

1 Like

Thanks for the answer Mitchell, that helps. I will keep an eye out for version 5.

Our rebuild process works by first accepting an aggregate ID to rebuild. It then goes to the domain events collection (we use MongoDB) to retrieve all events for the given aggregate. Then, the view projections are manually cleared out (most are a simple deleteByAggregateId query). Finally, we have a SubscribableMessageSource configured with all of our event processors. This is called with all the domain event messages so that they can be pushed to the event processors:

fun emitDomainEvents(events: List<DomainEventMessage<*>>) {
        eventProcessors.forEach {
            it.accept(events.toMutableList())
        }
    }

For this to work, we had to make some tweaks to our event saga, so that it would not be invoked on a replay. We control this using the presence or absence of some metadata. A bit hacky, but it was the solution that we were able to find.

Just curious here Oliver, but why don’t you use Axon’s integrated replay functionality?
It would eliminate this manual process you describe by simply invoking StreamingEventProcessor#resetTokens.
Or, does the situation arrise so often that you only have to replay a single model instance (the aggregate data you’re retrieving) which merits the additional complexity?

For this to work, we had to make some tweaks to our event saga

When sticking to your current design, why not remove the Sagas from the batch of event processors you’d push through your custom replay logic?
Note that the Framework ensures Sagas cannot be replayed by default through the aforementioned reset functionality.

We do not use the integrated functionality because we do not use Token event processors. In looking into it, we discovered that the using tokens added an additional database call at the beginning of any command. This would have been added latency for regular processing, which sounded like a bad tradeoff. So we had to devise our own strategy for rebuilding our views.

To answer your second question, I will have to give you a bit of context as to how our events flow out of the aggregate. We wanted to bypass the default behavior of Axon so that the UnitOfWork performed on an aggregate would not be undone by potential errors in the event handlers that construct our views. So we decided to segregate the events flowing from the event store and the event handlers that process them. The flow is as follows:

  1. Event store messages are pushed to an InternalEventAdapter processing group.
  2. These events are then published to Kafka.
  3. A SubscribableMessageSource listens on Kafka for these messages, and then pushes them to a set of event processors (Saga event handlers as well as view handlers)

The message source in #3 is configured as the default, so all processing groups (excluding the one I mentioned earlier) receive their messages from this source. This allows us to easily use the source for 2 purposes:

  1. Normal event processing
  2. View projection rebuilding

The downside, as mentioned before, is that we have to factor in the sagas. They need to be invoked under regular processing, but since the message source is shared with the replay functionality, we had to get fancy with our method signature:

    @StartSaga
    @SagaEventHandler(associationProperty = "aggregateId")
    fun handle(event: AddressCreated, @MetaDataValue("invokeSaga", required = true) invokeSaga: Boolean?) {
        // . . .
    }

We added some gatekeeping via a custom metadata field - if it is not present, the saga is bypassed and not invoked. So during regular processing, we supply that field, but during a replay, it is omitted.

This was probably more information than you wanted :sweat_smile:. But I hope it helps express our use case. Feedback is welcome - I am sure there are some things that we are doing that could be simplified.