Broadcasting events

We have a Service where we want to add local caches and we thought adding a event handler to invalidate the local cache would be a fine way to avoid dirty reads. In order for that to work, every instance of the service would need to get every published event of the an event type.

I tried to make it work with a Tracking Event Processor, but that is really complicated and feels wrong. From my point of view i would need to:

  • define a an in memory token store just for this kind of processing groups
    ** I don`t know if that is even possible

OR

  • create a unique processing group for every instance
    ** delete it on shout down to avoid clutter in the Token Store
  • find a way to register my handler to this processing group, because @ProcessingGroup just takes constants as arguments and I would need to add a random String so the processing group is unique.

It feels to complicated for something that was quite simple when working with RabbitMQ or Kafka. So I think I try to get Axon/Axon-Server to do something that it is not supposed to do. Is that right?

An other alternative would be to keep am Subscription Query open and admit updates to the cached Data. Would that be the AxonIQ way to do something like this?

Hi Hendrik,

both approaches are actually possible with Axon. The key class that you seem to be missing is the EventProcessingConfigurer class.
If you’re on Spring Boot, you can annotate a method with @Autowired and give it an EventProcessingConfigurer as a parameter. Spring will then invoke it. If you’re not on Spring Boot, you can use Configurer.eventProcessingConfigurer() to get the instance.

Once you have your instance of the EventProcessingConfigurer, you can do many things. But what you’d want to do is 2 things:

  1. configure a TokenStore, just for this processor:
epc.registerTokenStore(processorName,
                       c -> new InMemoryTokenStore());
  1. Make sure the processor starts at the most recent end of the stream, and doesn’t do a full replay on startup:
epc.registerTrackingEventProcessorConfiguration(
    processorName, 
    c -> TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                                            .andInitialTrackingToken(EventStore::createHeadToken)
);

It’s the “andInitialTrackingToken” that is the important part. If you leave that out, it will default to streaming from the very start.
While you’re in the Processor Configuration, you might also want to set the Batch Size. No need to “commit” each event separately: andBatchSize(100) would be a good addition.

If you prefer going the other route, you can programmatically assign the processing group to a processor. If you then suffix it with a per-app-instance identifier, you achieve something similar. However, it seems that the first solution is more appropriate.

Hope this helps.

Cheers,
Allard

1 Like

Hi Allard,

thank for the ultra fast replay. I Implemented and tested the described InMemoryTokenStore method and it works like a charm!

Cheers,

Hendrik

1 Like