EventHandler seems to stop after a few hours

We have an application with an eventhandler that seems to stop functioning after a few hours of running.

The below class is the (anonymized) one that breaks. We have two of these with similar behavior :

  • Use data from the event to do a call to an external service
  • Transactional
  • Different processing groups
  • Result of the service sends a new command on the command gateway
  • Using axon 4.1.1
  • All events stored in a remote oracle database

When we inspect the Thread belonging to this processing group it seems to be reading from the database. We are now unsure where to look for a solution and wondering if we are doing something wrong with Axon.

What we want to accomplish is:

  • Receive data in the form of a Command
  • The aggregate applies an Event
  • This event is processed by the below class
  • This class retrieves data from an external service
  • This service data sends a new command

Is this implementation a correct solution for our problem?
Is the @Transactional annotation the correct solution for making Axon retry?
Can multiple @Transactional ProcessingGroups be the cause of this issue?
Can anyone give us some pointers on where to look to debug the cause of the problem?

Regards,

John

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;

import javax.transaction.Transactional;

@Component
@Transactional
@ProcessingGroup("myProcessor")
public class EventProcessor {
    private final ExternalApi api;
    private final CommandGateway commandGateway;

    public EventProcessor(final ExternalApi api, final CommandGateway commandGateway) {
        this.api = api;
        this.commandGateway = commandGateway;
    }

    @EventHandler
    public void handleEvent(final Event event) {
        final Response response = api.call(new Request());
        commandGateway.send(new SomeCommand(event.getId(), response.data));
    }
}

Hi John,

First off, nice to chat with you again!
Trust everything is going fine, a part from the issue you’re sharing here.
So, let’s go towards trying to solve it.

Based on the information you’re giving, it sounds like the Thread actually is performing a query.
Would you be able to see what kind of query it is doing?
If you would be able to share the stack trace of that thread, that would be even nicer!

Any how, on the provided information I am not really sure what the problem would be at the moment.
The sole thing I can grasp from your snippet, is the @Transactional annotation.
You are not required to add this yourself, as the framework deals with wrapping a transaction around every message for you.

It does so through the underlying UnitOfWork, which is aware of a message living in several phases like prepare_commit, commit, rollback, etc.
As such, it’s the ideal place to deal with transactions, which it does so through a TransactionManager.
As you’re in a Spring environment, you’ll automatically get a SpringTransactionManager auto wired for you, to each UnitOfWork to deal with transactions.

Concluding, it might work to drop that annotation, which you can do regardless of whether it solves the problem.
So, if it doesn’t solve it, then I’d suggest to share the performed query and/or a stack trace of the blocking thread.

Let’s try to figure this out John!

Cheers,
Steven

Hello Steven,

Thank you for the reply! All is well here, except for this inconvenience :slight_smile:

There was a moment where we thought we fixed it, but it seems we are stuck again.
We did learn that this problem persists over restarts. As far as I can see in the logs, the broken processor does nothing from the start. I can see nothing in the logs from it’s Thread.
Here’s a sample from the token_entry table:

brokenProcessor 2019-07-10T12:40:29.416817Z <org.axonframework.eventhandling.GapAwareTrackingToken>3097400<java.util.concurrent.ConcurrentSkipListMap>3096751true3096752true3096754true3096758true3096763true3096766true3096769true3096771true3096773true3096777true3096780true3096783true3096784true3096788true3096791true3096792true3096797true3096801true3096804true3096807true3096808true3096811true3096812true3096814true3096820true3096822true3096825true3096826true3096830true3096831true3096842true3096844true3096845true3096848true3096850true3096853true3096854true3096857trueSOME_MORE</java.util.concurrent.ConcurrentSkipListMap></org.axonframework.eventhandling.GapAwareTrackingToken>

workingProcessor 2019-07-10T14:34:24.43648Z <org.axonframework.eventhandling.GapAwareTrackingToken>3108693<java.util.concurrent.ConcurrentSkipListMap></java.util.concurrent.ConcurrentSkipListMap></org.axonframework.eventhandling.GapAwareTrackingToken>

workingProcessor 2019-07-10T14:34:27.168501Z <org.axonframework.eventhandling.GapAwareTrackingToken>3114859<java.util.concurrent.ConcurrentSkipListMap>3114682true3114684true3114687true3114694true3114698true3114699true3114701true3114711true3114712true3114714true3114716true3114720true3114721true3114725true3114728true3114731true3114732true3114738true3114744true3114752true3114754true3114758true3114761true3114765true3114767true3114770true3114771true3114773true3114776true3114780true3114787true3114790true3114793true3114797true3114798true3114800true3114804true3114805true3114815true3114819true3114824true3114827true3114838true3114841true3114845true3114851true3114852true3114857true3114858true</java.util.concurrent.ConcurrentSkipListMap></org.axonframework.eventhandling.GapAwareTrackingToken>

I find it weird that the brokenProcessor has a timestamp of 12:40, the moment when everything was still working. As you can see in the index, it is far behind on the working processors, which all have a number around 3114859.
This fits with what we see in the service that brokenProcessor is supposed to call. The dotted lines in the image represent redeploys of the axon application.

STOPPED.png

We cannot remove the @Transactional annotation at this time because it causes one of our tests to fail. With the @Transactional annotation, Axon will automatically retry after 1s, 2s, 4s,…, 60s. It will keep doing this until it works, which is what we want. Without this annotation we do not get an automatic retry. Are we abusing something here we shouldn’t do? What would you recommend to get this retry behavior on our EventHandlers?

We are also unsure if this is caused by high load. We tried a small load test on our machines where we fed the system 2500 cases in 5 minutes, which due to some slow service calls took about half an hour to process. In the end everything worked and all cases completed to the end, including the brokenProcessor.

`

“EventProcessor[brokenProcessor]-0” #59
Thread State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at oracle.net.ns.Packet.receive(Packet.java:311)
at oracle.net.ns.DataPacket.receive(DataPacket.java:105)
at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:305)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:249)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:171)
at oracle.net.ns.NetInputStream.read(NetInputStream.java:89)
at oracle.jdbc.driver.T4CSocketInputStreamWrapper.readNextPacket(T4CSocketInputStreamWrapper.java:123)
at oracle.jdbc.driver.T4CSocketInputStreamWrapper.read(T4CSocketInputStreamWrapper.java:79)
at oracle.jdbc.driver.T4CMAREngineStream.unmarshalUB1(T4CMAREngineStream.java:426)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:390)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:776)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:897)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1034)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3867)

  • locked <0x0000000015091697> (a oracle.jdbc.driver.T4CConnection)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1502)
    at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
    at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java:-1)
    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.extract(ResultSetReturnImpl.java:60)
    at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.getResultSet(AbstractLoadPlanBasedLoader.java:419)
    at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeQueryStatement(AbstractLoadPlanBasedLoader.java:191)
    at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:121)
    at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:86)
    at org.hibernate.loader.entity.plan.AbstractLoadPlanBasedEntityLoader.load(AbstractLoadPlanBasedEntityLoader.java:188)
    at org.hibernate.persister.entity.AbstractEntityPersister.load(AbstractEntityPersister.java:4273)
    at org.hibernate.event.internal.DefaultLoadEventListener.loadFromDatasource(DefaultLoadEventListener.java:511)
    at org.hibernate.event.internal.DefaultLoadEventListener.doLoad(DefaultLoadEventListener.java:481)
    at org.hibernate.event.internal.DefaultLoadEventListener.load(DefaultLoadEventListener.java:222)
    at org.hibernate.event.internal.DefaultLoadEventListener.lockAndLoad(DefaultLoadEventListener.java:406)
    at org.hibernate.event.internal.DefaultLoadEventListener.doOnLoad(DefaultLoadEventListener.java:127)
    at org.hibernate.event.internal.DefaultLoadEventListener.onLoad(DefaultLoadEventListener.java:92)
    at org.hibernate.internal.SessionImpl.fireLoad(SessionImpl.java:1257)
    at org.hibernate.internal.SessionImpl.access$2000(SessionImpl.java:208)
    at org.hibernate.internal.SessionImpl$IdentifierLoadAccessImpl.doLoad(SessionImpl.java:2874)
    at org.hibernate.internal.SessionImpl$IdentifierLoadAccessImpl.load(SessionImpl.java:2855)
    at org.hibernate.internal.SessionImpl.find(SessionImpl.java:3490)
    at org.hibernate.internal.SessionImpl.find(SessionImpl.java:3464)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:308)
    at com.sun.proxy.$Proxy173.find(Unknown Source)
    at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.loadToken(JpaTokenStore.java:216)
    at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.fetchToken(JpaTokenStore.java:167)
    at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.lambda$run$1(TrackingEventProcessor.java:1053)
    at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher$$Lambda$1256/0x0000000100f23440.run(Unknown Source)
    at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47)
    at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1052)
    at java.lang.Thread.run(Unknown Source)

`

So… Five minutes after posting this, the brokenProcessor started again. The token_entry table has a new timestamp to now.

Hi John,

Alright, from here on it sounds like the quite complicated Event Producer and Event Consumer (private) optimization logic in the framework isn’t playing nicely within your setup.
There is a simple solution against this optimization logic, which is to toggle it off.
My hunch is that if you do turn it off, that you’ll not encounter this issue where (I think) your consumption threads are waiting for one another.

If you set an application property named “optimize-event-consumption” to “false”, then it should be turned off correctly for you.
If you prefer to set this optimization in code, you can do this by creating your own EmbeddedEventStore bean through it’s Builder and using the Builder#optimizeEventConsumption(boolean) method.

Would you mind trying whether switching this property resolves the problem at hand?

Hope this will resolve it!

Cheers,
Steven

Hi Steven,

As far as I can see we are not doing anything weird, except for maybe sending Commands from an EventHandler and running multiple instances on the cloud.

Since your reply we haven’t had any downtime and we would like to see first how long this will last. We are also going to see if we can reproduce this with some kind of load test.
When our brokenProcessor breaks again, we are going to deploy the version with your suggested fix to see if this solves the problem.

Something we noticed while debugging is that the commandGateway.send in our case was not an asynchronous call and ran in the same Thread, even though it returns a CompletableFuture. Is this expected when using the default CommandBus?

In any case, thanks a lot for the assistance! We’re going to let it run over the weekend to see how it behaves and get back to you when we’ve tried your suggestion!

Hi John,

Alright, keep us posted if the issue turns up again!

Something we noticed while debugging is that the commandGateway.send in our case was not an asynchronous call and ran in the same Thread, even though it returns a CompletableFuture. Is this expected when using the default CommandBus?
I am assuming you have not configured your own CommandBus, is that correct?

By default, you’ll receive the SimpleCommandBus which indeed will run in the same thread.

The API is such that it can be asynchronous though, which you can enforce by specifying the AsynchronousCommandBus instead of the SimpleCommandBus.

Hope this helps!

Cheers,
Steven

Hello Steven,

We did not configure anything on the command gateway, thanks for explaining.

The event processors on our application quit on us again this morning so we tried setting the optimize-event-consumption setting to false. Both on my local machine and on our servers we noticed the difference. My local machine maxed out CPU usage, which it didn’t do under the same load before.

We also saw that the other event processors that we thought were working are far behind in processing the events. Below is a screenshot where you can see the moment the new version went live.

STARTED.png

The yellow line is the event processor we thought was up to date. The green one died on us and was at 0. The blue line represents Events caused by new aggregates. The unit is Operations per Minute.

So now we are waiting for the approx 30k sized backlog to be processed, but as far as we can see now our application performs a lot better!