Is Axon Cloud throttling command dispatch?

Hello,

I previously posted a thread to address the performance of my projections. Now that I fixed this problem I am facing another one regarding the same feature.

After certain events are published I have an event handler that issues delete commands to many aggregates. I noticed that sometimes the commands don’t get dispatched. I was wondering whether this is some limitation by Axon Framework or if it is a limitation of AxonIQ cloud. We are currently in a shared context in developer mode.

I just got an exception that states

org.axonframework.lifecycle.ShutdownInProgressException: Cannot dispatch new commands as this bus is being shut down.

I don’t know why a shutdown would be in progress. This seems odd to me.

Update:
Another exception occured:

java.lang.RuntimeException: Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: -1

o.a.e.eventstore.AbstractEventStore: Error reading snapshot for aggregate [bf190d13-5dce-414f-9707-1a89cc3c0f75]. Reconstructing from entire event stream

I hope someone can help me :slight_smile:

I tried to horizontally scale my service to increase the amount of command handlers. This seems to remedy the cannot dispatch new commands issue.

However, my console is now spammed with errors like

org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException: Unable to extend the claim on token for processor 'processorName[5]'. It is either claimed by another process, or there is no such token.
o.a.eventhandling.pooled.WorkPackage     : Error while processing batch in Work Package [4]-[removeDeletedShiftsFromPlanningProcess]. Aborting Work Package...

This one is triggered by org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.fetchToken:

 org.hibernate.exception.LockAcquisitionException: could not extract ResultSet

and

com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction

Eventually, all events are handled successfully.

I configured my event processors like this. I copied and adapted it from another blog as a solution to my previous performance-related question. Perhaps I did something wrong here as there is no real documentation for it.

public void configure(EventProcessingConfigurer configurer, ScheduledExecutorService threadPool) {
    configurer.usingPooledStreamingEventProcessors();

    configurer.registerPooledStreamingEventProcessorConfiguration(
            "processor1",
            createProcessorConfiguration("processor_coordinator", threadPool, 10, 200)
    );

    configurer.registerPooledStreamingEventProcessorConfiguration(
            "processor2",
            createProcessorConfiguration("processor_coordinator", threadPool, 10, 200)
    );

    configurer.registerTrackingEventProcessorConfiguration("processor3", configuration ->
            TrackingEventProcessorConfiguration
                    .forSingleThreadedProcessing()
                    .andBatchSize(200));
    configurer.registerListenerInvocationErrorHandler("processor3", configuration -> PropagatingErrorHandler.instance());
}

private EventProcessingConfigurer.PooledStreamingProcessorConfiguration createProcessorConfiguration(String coordinatorName, ScheduledExecutorService threadPool, int initialSegmentCount, int batchSize) {
    return (configuration, builder) -> builder.coordinatorExecutor(
            Executors.newScheduledThreadPool(10, new AxonThreadFactory(coordinatorName)))
            .workerExecutor(threadPool)
            .batchSize(batchSize)
            .initialSegmentCount(initialSegmentCount);
}

Hi Daniel,
there are no differences between an Axon Cloud and a local installation of axon server.

On your first question, I can’t really help without having some more logs line around the ShutdownInProgressException.

Are you able to reproduce it?
Is your developer context still in the 14 days data retention period? Or maybe was automatically removed and you re created it, without cleaning your axon framework client application database?

I want to help you out, but I need to gather more information.

Thanks.

1 Like

I am not replying to overrule @Corrado_Musumeci his post, just want to add some additional info in here.

ShutdownInProgressException

The ShutdownInProgressException is thrown whenever an application tries to dispatch any type of message while the shutdown signal was sent to the application. Or if you would manually invoke Configuration#shutdown.

In either case, the application instance in question is told to shutdown. Otherwise I cannot foresee why you’d receive this exception.

Last sequence number received: -1

This message signals the application was unable to retrieve any events for a specific aggregate.
Just out of curiosity, but did that exception get followed up by the AbstractEventStore exception your sharing, @danstoofox?

UnableToClaimTokenException

The UnableToClaimTokenException is thrown whenever an Event Processor cannot claim or update a token. Specifically in this case, one Event Processor had the claim, processed events, and wanted to update the token’s position, but was incapable to as the claim was stolen by another thread.

In most scenarios, this signals that event handling may take too long on some of your instances.
So, can you perhaps share what your event handling components are doing, Daniel? Maybe there’s a means we can optimize that process to resolve this token-stealing-process. The introduction on Tracking Tokens explains this a little bit, by the way.

1 Like

Hello @Corrado_Musumeci and @Steven_van_Beelen,

sorry for the long delay but I was a bit busy. I was able to resolve my problem by now. I ended up completely redoing the configuration and abandoning PooledStreamingProcessors which were causing all the exceptions in my case. I think I must have configured them wrong. I also have a feeling that they are overkill in my situation as we are only dealing with around 5000 events which are rarely emitted in bursts. Usually, we only have very few events per second. I switched to TrackingEventProcessors and optimized my event handlers.

In the process, I also realized that my event processors are not the bottleneck. The command side with a SimpleCommandBus seems to struggle with handling many commands at once. I switched to the AsynchronousCommandBus now. This change alone seems to resolve all my problems.

The pretty straightforward configuration and the new command bus feel much more stable now and I learned a lot. Thank you for responding to my thread @Corrado_Musumeci and for the additional insights from @Steven_van_Beelen.

1 Like

Happy to hear you figured out the predicament, @danstoofox!
Switching the CommandBus for a variant that uses several threads is a smart move.

However, Corrado and I assumed you already used a multi-threaded variant, as you’re using AxonIQ Cloud. When using AxonIQ Cloud, the CommandBus will switch to the AxonServerCommandBus. This implementation, by nature, uses an ExecutorService to spawn threads for dispatching and handling commands.

So, a bit out of curiosity, have you moved away from AxonIQ Cloud? Is there some other piece of configuration we’re missing here? Are you, per chance, talking about the “local segment” that you’ve switched from the SimpleCommandBus to an AsynchronousCommandBus?

We are still with AxonIQ Cloud and are currently in the process of moving to a single node instead of developer contexts for our alpha.

I am sending commands between applications through the command bus so I assume I only configured the local segment. I just created a new CommandBus bean in a configuration class. All commands are dispatched through the command gateway.

Gotcha, thanks for that, Daniel.
I believe the auto-configuration will pick it up correctly, but just to be sure, I’d recommend adding @Qualifier("localSegment") to the AsynchronousCommandBus bean creation method.

Axon’s auto-config, when constructing the AxonServerCommandBus, will look for a bean of type CommandBus with the qualifier localSegment attached to it. It’s this local segment that, by default, is a SimpleCommandBus. However, making it asynchronous, means there’s an additional level of threads for handling commands.

Firstly, the AxonServerCommandBus will receive the commands, delegating them as CommandProcessingTasks to a thread pool. These tasks will invoke the localSegment, which in turn has a thread pool. It surely means more is processed in one go, but I think you could also come by with adjusting the number of threads of the AxonServerCommandBus.

The property for this, is axon.axonserver.command-threads, by the way.