Event interceptor not receiving events

Hi,

I’ve run into an issue which I can’t explain, so hopefully you can. What I want to achieve is to intercept all events, map them to some general format an publish them to a Kafka topic. My first attempt was using the axon Kafka-extension which does what is supposed to do, but I could not extend the KafkaPublisher or plug in my own implementation.

Therefore I wrote an event interceptor which does what is supposed to do and configured it like described in the reference manual (Message Intercepting - Axon Reference Guide).
I registered it like this:

public class EventProcessorConfiguration {

    public void configureEventProcessing(Configurer configurer) {
        configurer.eventProcessing()
                  .registerTrackingEventProcessor("my-tracking-processor")
                  .registerHandlerInterceptor("my-tracking-processor",
                                              configuration -> new MyEventHandlerInterceptor());
    }
}

But messages are not received by my interceptor. When I change the name of the tracking event processor to an existing one I do receive message but only for events that have an @EventListener annotated function in that processor.

I expected the result of the configuration is a new tracking event processor which will ‘forward’ all events to my newly registered interceptor. Since the new tracking event processor is not visible in the axon_token_entry table among the other ones, I concluded that is it not created and therefore no events are received.

So what is my misunderstanding here? What do I need to do to achieve this? I’d like to use the mechanism in the after commit phase of the unit work to map and publish my events to Kafka. Which is not possible in the MessageDispatchInterceptor implementation I’am currently using and which does receive all events.

Btw: I’am using the 4.8.1 axon-bom and a SimpleEventBus.

Regards,

Frank

Hi Frank,

We don’t start an event processor just by registering one with a certain name, which is why the observed behavior is correct. One way around this would be to also create the event processor with that name and have an even handler on Object so you get all events.

Using an event processor that immediately publishes to Kafka from the event handler fuction is more idiomatic. You might leverage the Kafka Extension to do so, like here.

I hope this was helpful, please let us know if you have follow-up questions.

Thanks Gerard,

That makes sense. So indeed there is no need for an interceptor since a tracking eventprocessor with an @Eventhandler function is a more simple solution.

Is there a way to influence the processing order of the event processor? I would prefer this Kafka one being the last in the chain.

Regards,

Frank

I’m not sure what you mean. By default, it will be a separate tracking event processor with its own tracking token and threads, opening its own stream to the event source.

Maybe you want to make sure some other event handler has updated a projection before the event is send to Kafka? It’s possible to order them, as long as they belong to the same event processor. This does mean they become coupled, and you can’t for example reset the projection, without resetting the sending to Kafka.

Hi Gerard,

Thanks again, the ordering part is all clear to me.

What I’am trying to achieve is sending a Kafka message to inform our graphQL nodes that some specific aggregate is updated e.g Employee with uuid x). I also include the aggregate version so the graphQL resolver kan fetch data from te read model and make sure the version matches before pushing data to the subscribed clients.

So it would be best to send the Kafka message after the read model is updated. Is there a non blocking way to achieve this? E.g using isCaughtUp? I want to use a generic mechanism instead of the manual query emitters voor subscribing queries.

Thanks a lot,

Frank

From what I understand in your case, there is coupling between the GraphQL nodes, and a projection used?

If there is indeed such a coupling, you can use an Axon Framework Subscription query, and emit on the updates, so you can send the updated entities. An example can be found here.

Another approach would be to use a different projection, specific for GraphQL. In that case you can use the events themselves as trigger. In this example, I leverage Axon Synapse, and build a seperate projection.

I’m not sure about a generic and non blocking way. You might leverage change data capture on the projection, so you are sure you have all the changes. Are you afraid someone might forget the emit for a new event?

Goodmorning Gerard,

Thanks for your input. I was on a holiday so that’s why our conversation stopped.

Actually our complete read model is used by our graphQL resolvers. And for each domain event we sent a ‘domainNotification’ over Kafka to inform all graphQL nodes that changes have been applied in the read model.

Since Kafka is a preferred solution on a company level, we will stick with that in favour of Synapse.

Thanks for your feedback!

Frank

1 Like