EventProcessor Performance Issues

Hello again,

I hope all are doing well. I have implemented a solution that triggers the creation of ‘many’ events (<10k). I have a single processing group responsible for handling all those events since the event handling order is important for events relating to the same aggregate. The created events usually occur on their own. The case that many are created at the same time only happens once or twice a year.

Currently, it looks like all events are created very fast by my command handlers but the projections are not updating fast enough. I am aiming for less than a couple of seconds to handle all events in my projections.

I found this presentation which talked about possible solutions. All of the presented solutions seem to be fast enough for my use case. However, I haven’t been able to get them working because I can’t seem to find any documentation.

I think that the parallel processing solution is a good fit for me. I tried this:

configurer.registerTrackingEventProcessorConfiguration("processingGroupName", configuration ->
                TrackingEventProcessorConfiguration
                        .forParallelProcessing(4)
                        .andInitialSegmentsCount(4)
                        .andBatchSize(100));

I don’t see any difference with this configuration. I hope someone can show me the correct solution to this problem.

Thank you.

PS: We are using Axon Server EE

Please note that initial segment count only is taken into account when the processor did not create the tokens yet. So likely you still only have one segment. From Axon Server UI you can split the segment to 2, and then split each to go to 4 segments. Only then the TEP will be able to run and process from 4 different threads.

Thank you for the fast reply @Gerard. The Axon Server UI in the Cloud seems to be broken “Unexpected end of JSON” for every split and merge action. I recreated the tokens and I can verify that they are now running with four threads.

The performance is slightly better but still not what I was hoping for. As a side note, using four threads and 10 segments resulted in events not being processed. I changed it to four threads and four segments and it seems to work.

My event handlers are trivial. Load entity from DB, perform a single (not expensive) method, and store the entity in DB. I assume that the database operations may present a bottleneck. The linked article talked about this too. Ideally, I would process the events of a bunch of entities and perform a saveAll after the batch is processed. Is this possible? If yes, could you please provide an example?

Context: The events belong to many different aggregates (around 1-5 per aggregate). Each aggregate is represented by an entity in the query database.

Thanks

With TEP you indeed need to be careful to have at least one thread running for each segment.

You might be able to increase performance by making the event handler aware batching is used. This topic has an example of how to do so.

1 Like

I tried to follow the example but now it seems like not all events are processed anymore while still being too slow. I haven’t noticed an improvement to be honest.

My implementation:

private  Map<String, MyEntity> getBatch(UnitOfWork<?> unitOfWork) {
        return unitOfWork.getOrComputeResource("uniqueName/" + this.toString(), k -> {
            Map<String, MyEntity> listOfBatchedOperations = new HashMap<>();

            unitOfWork.onPrepareCommit(u -> {
                myEntityRepository.saveAll(listOfBatchedOperations.values());
            });

            return listOfBatchedOperations;
        });
    }
private void addToBatch(UnitOfWork<?> unitOfWork, MyEntity myEntity) {
    Map<String, MyEntity> batch = getBatch(unitOfWork);
    batch.put(myEntity.getId(), myEntity);
}
private MyEntity getMyEntityToEdit(String myEntityId, UnitOfWork<?> unitOfWork) {
    Map<String, MyEntity> batch = getBatch(unitOfWork);

    MyEntity myEntity;
    if (batch.containsKey(myEntityId)) {
        myEntity= batch.get(myEntityId);
    } else {
        myEntity = myEntityRepository.findById(myEntityId)
                .orElseThrow();
    }

    return myEntity;
}

All my event handlers are similar to this implementation:

@EventHandler
public void on(MyEvent event, UnitOfWork<?> unitOfWork) {
    MyEntity myEntity = getMyEntityToEdit(event.getMyEntityId(), unitOfWork);

    myEntity.performUpdate(...);

    addToBatch(unitOfWork, myEntity);
}

Is this correct?

Since you are using multiple threads that access the listOfBatchedOperations, it should handle concurrent access well. For example, the ConcurrentHashMap. Otherwise, I don’t see anything wrong. You might want to add tracing to find what is going slow.

Hmm, I played around with the settings a bit. It does seem to work now but is still too slow. I managed to speed things up by caching the database requests for a unit of work.

I am now running into issues with my database when I want to handle another batch of events (cascading delete) because I am opening too many database connections (with JPA).

I am thinking about changing my events to accommodate “group” events that are meant for many aggregates. These group events are handled inside my projections which will dramatically reduce the amount of events to handle. Inside custom event handlers I issue commands to the aggregates targeted by the group event so the command model is also updated. This eventual consistency is acceptable in my use case but I would much rather stick with my previous approach without the group events.

Any thoughts?

I’m not sure about events affecting multiple aggregates. To me it sounds like the aggregate might not be correct. But I don’t know the domain, or the problem you want to solve.

Typically there are two kinds of event, domain events effecting a single aggregate, or non domain events that aren’t part of an aggregate. Non-domain events can be used for integration for example.

Let me explain my domain a bit so you get a better picture.

We are dealing with events (not Axon events but calendar events). Each calendar event is independent of the others hence we modeled it as a single aggregate. The user usually works on a single event in isolation (i.e. edit details, …). But now we have the requirement to create/delete many similar events at the same time. After that, the events will be viewed in isolation again.

At the moment, I issue axon commands to each calendar event aggregate. This results in the huge amount of events that must be processed by my projections. This process is too slow tho because I can view my calendar events getting created in my frontend. Ideally, they would just pop up at the same time without a big delay. My current solution can create 5000 calendar events with extensive preconfiguration (i.e., set details, …) in under 15s.

The solution with the “group” events is a workaround to make the projections more efficient. Generating the events is a matter of milliseconds but handling the events takes a lot more time.

In the background, I would still issue the commands to the single calendar event aggregates but the projections don’t listen to them. But I agree it is not very clean and I would much rather prefer my current approach (or a different model if you have an idea). The reason to have so many aggregates is that they are so many (going up into the 10000s). Transactional consistency is not really a concern for us, we are very ok with eventual consistency.

Maybe you can use a Saga/process manager to create/delete similar calaedar events? The Saga can make sure to only send Commands to the calendar events that are applicable.

Do you agree with my workaround?

I could use a saga to issue the commands to the affected aggregates in the background. The issue with the slow event processing remains for me.

I read a comment by @Steven_van_Beelen who suggested PooledStreamingProcessors here. Do they help with speeding up processing of events?

My current issue is with JPA that runs out of DB connections somehow. I only have two threads and segments configured for my processing group.

Running out of connections should be fixable by adjusting the connection pool size of Hikari.

1 Like

Yea, but why am I running into this problem in the first place? It feels very fragile to me at the moment

Let me provide my two cents, as I’ve been roped in with an @Steven_van_Beelen :wink:

Because every TrackingEventProcessor thread you configure will:

  1. Make an active connection to your Event Store. If this is an RDBMS, that’ll eat another connection from the pool.
  2. Make an active connection to the database containing your tokens (thus, for the TokenStore).
  3. Make an active connection to a database, or several databases, to update query models / projections. How many depends on the amount of Event Handling Components the Tracking Event Processor is in charge of. And whether the event handling component in question does anything with a persisted projection. If this is a different database or several different databases, then where the tokens are contained in (which isn’t recommended), that’ll be an additional connection once more.

Thus, if you’ve set the database connection pool to 5, you’ll run out quickly.

The Pooled Streaming Event Processor solves this predicament slightly.
Mainly because it no longer opens a connection to the Event Store per thread, but one for the entire Event Processor. However, if you’re using Axon Server / AxonIQ Cloud, that’ll not make a noticable difference in connection pool size.

You will benefit from the fact that Axon Server actively pushes events to a PSEP, notifying it as soon as they’re present. Using a PSEP without Axon Server (currently) means it’ll wait a maximum of 500ms before it fetches new events.


In all honesty, I didn’t read the full thread above.
I am purely reacting on the database connections pointer, Daniel.
Nonetheless, I hope this provides some guidance!

1 Like

Hi Steven, thank you for your insights. I switched to Pooled Streaming Event Processors and increased the connection pool size.

As far as I can tell, my performance issues are massively improved now and JPA doesn’t complain anymore.

For anyone reading the full thread: I am not using the “group” events approach but will be sticking with the (in my opinion) cleaner approach of issuing commands to the affected aggregates and reacting to the emitted events in my projections.

Thanks to all for your help!

3 Likes

Thanks for the feedback, Daniel!
I’m certain this will prove helpful to others, too :+1:
If you have any other questions in the future, be sure to post them on this forum!