Process events in batch and listen to UoW

Hello,

I have simple case when I have thousands of small events and some event handler that counts them.

The way how I see:

  • TrackingEventProcessor takes N (batchSize) events, creates and starts UoW, sends batch to EventHandler, commits UoW
  • EventHandler subscribes to UoW on each Start to create a counter variable, receives events one by one (or event better as a list), increases counter and on UoW commit, stores value in some DB.

I use axon without Sprint Boot so I had to configure axon via DefaultConfigurer. I managed to set batch size, But I have no Idea how I can subscribe to UoW notifications.
In documentation (https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/messaging-concepts/unit-of-work) I found that it is possible to get current UoW by calling CurrentUnitOfWork.get()
But in TrackingEventProcessor I found this piece of code:
// TrackingEventProcessor.java:391 UnitOfWork<? extends EventMessage<?>> unitOfWork = new BatchingUnitOfWork<>(batch); unitOfWork.attachTransaction(transactionManager); unitOfWork.resources().put(segmentIdResourceKey, segment.getSegmentId()); unitOfWork.resources().put(lastTokenResourceKey, finalLastToken); processInUnitOfWork(batch, unitOfWork, processingSegments);

As far as I can see it means that each batch has brand new UoW each time. So it makes impossible to subscribe on UoW notifications once via CurrentUnitOfWork.get()

Could someone share his experience with processing events in batches?

I’m lost a bit.

Thanks,
Anton

Hi Anton,

a new Unit of Work is indeed created for each batch. In you event handler, you would use the CurrentUnitOfWork.get() to get access to that unit of work (or simply add UnitOfWork as a parameter to your @EventHandler method). Each UnitOfWork allows you to register resources. Those resources are (re)used by the entire batch. In your case, you will probably want to do an “getOrCompute()” style operation to initialize a new counter. If a new counter is created, you want to use the unitOfWork.onPrepareCommit() method to add the counter’s value to the database.

In code:

@EventHandler
public void on(MyEvent myEvent, UnitOfWork<?> unitOfWork) {
AtomicLong counter = unitOfWork.getOrComputeResource(“myCounter”, k → {
AtomicLong atomicLong = new AtomicLong();
unitOfWork.onPrepareCommit(u → addValueToDatabase(atomicLong.get()));
return atomicLong;
});
counter.addAndGet(1);
}
private void addValueToDatabase(long value) {
// update query
}

Note that a UnitOfWork is never accessed concurrently. So it’s safe to have side-effects in the getOrComputeResource method.

Hope this helps.
Cheers,

Thanks, Allard.

We did that way, I just thought it can be done bit more elegant.
But I guess I can also play a little with interceptors.

Hi Anton,

there are indeed ways to do this more elegantly. The downside is that these methods will require you to dive deeper into Axon’s inner workings.
One thing you could do is put this logic in a ParameterResolver. That would allow you to simply pass an object by adding it as a parameter to your annotated handler method. This is an approach that we’ve been thinking about making configurable in an easier way.

Cheers,