Tracking event processor event handling responsability

Hi

When putting event handlers in different packages multiple tracking event processors are instantiated as expected. Each has their own GapAwareTrackingToken.

We have a use case where one part of the commands is generated by an external system reading large csv files.
The other part are user generated commands through an API layer.

It is ok for the processing of the csv files into the aggregates and finally read model to take some time. We currently use Kafka as middleware to stream the csv lines into the command bus.
However we want to make sure that the user commands are processed faster.

When separating an event handler that was only processing the events caused by human generated commands in another package it was correctly handling the events, but it seemed rather slow.
The user command event was set at around index 4000, while both tokens were incrementing at about the same speed. Only after the user event processor reached 4000 the command result was inserted in the view model as expected.

Maybe I am wrong to assume this but I expected that the tracking event processor which is only interested in one or a few events would be able to process the events faster than one that is handling more of them. However the token index increased at roughly the same pace.

Best regards,

Michael

Stacktrace below showing the lock exception on the tokenentry. I was not expecting that since each processor should only put a lock on its row. Performance when only using a single tracking event processor was 5-6 times faster than using four, most likely due to the locks.:

29-May-2017 13:08:46.588 WARN [o.a.e.TrackingEventProcessor] Error occurred. Starting retry mode.
org.springframework.dao.CannotAcquireLockException: could not execute statement; SQL [n/a]; nested exception is org.hibernate.exception.LockAcquisitionException: could not execute statement
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:269)
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:244)
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:521)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730)
at org.axonframework.spring.messaging.unitofwork.SpringTransactionManager.commitTransaction(SpringTransactionManager.java:80)
at org.axonframework.spring.messaging.unitofwork.SpringTransactionManager$1.commit(SpringTransactionManager.java:63)
at org.axonframework.messaging.interceptors.TransactionManagingInterceptor.lambda$handle$0(TransactionManagingInterceptor.java:48)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:275)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:219)
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:186)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.hibernate.exception.LockAcquisitionException: could not execute statement
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:123)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:207)
at org.hibernate.engine.jdbc.batch.internal.NonBatchingBatch.addToBatch(NonBatchingBatch.java:45)
at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3082)
at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2961)
at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3341)
at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:145)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:582)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:456)
at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:337)
at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:39)
at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1282)
at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:465)
at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:2963)
at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2339)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:485)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:147)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$100(JdbcResourceLocalTransactionCoordinatorImpl.java:38)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:231)
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:65)
at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:61)
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517)
… 22 common frames omitted
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 60) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1635)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:426)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:372)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:6276)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:1793)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:184)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:159)
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:315)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:204)
… 42 common frames omitted
29-May-2017 13:08:46.588 WARN [o.a.e.TrackingEventProcessor] Releasing claim on token and preparing for retry in 1s
29-May-2017 13:08:47.389 WARN [o.h.e.j.s.SqlExceptionHelper] SQL Error: 1205, SQLState: 40001
29-May-2017 13:08:47.389 ERROR [o.h.e.j.s.SqlExceptionHelper] Transaction (Process ID 56) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
29-May-2017 13:08:47.390 WARN [o.a.e.TrackingEventProcessor] Unexpected exception while attemting to retrieve token and open stream. Retrying in 5 seconds.
javax.persistence.PersistenceException: org.hibernate.exception.LockAcquisitionException: could not load an entity: [org.axonframework.eventhandling.tokenstore.jpa.TokenEntry#component[processorName,segment]{segment=0, processorName=com..read.eventhandlers.run}]
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1692)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1619)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1106)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1046)
at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:298)
at com.sun.proxy.$Proxy149.find(Unknown Source)
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.loadOrCreateToken(JpaTokenStore.java:133)
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.fetchToken(JpaTokenStore.java:100)
at org.axonframework.eventhandling.TrackingEventProcessor.ensureEventStreamOpened(TrackingEventProcessor.java:290)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:217)
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:186)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.hibernate.exception.LockAcquisitionException: could not load an entity: [org.axonframework.eventhandling.tokenstore.jpa.TokenEntry#component[processorName,segment]{segment=0, processorName=com.
.read.eventhandlers.run}]
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:123)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.loader.entity.plan.AbstractLoadPlanBasedEntityLoader.load(AbstractLoadPlanBasedEntityLoader.java:177)
at org.hibernate.persister.entity.AbstractEntityPersister.load(AbstractEntityPersister.java:3967)
at org.hibernate.event.internal.DefaultLoadEventListener.loadFromDatasource(DefaultLoadEventListener.java:508)
at org.hibernate.event.internal.DefaultLoadEventListener.doLoad(DefaultLoadEventListener.java:478)
at org.hibernate.event.internal.DefaultLoadEventListener.load(DefaultLoadEventListener.java:219)
at org.hibernate.event.internal.DefaultLoadEventListener.lockAndLoad(DefaultLoadEventListener.java:403)
at org.hibernate.event.internal.DefaultLoadEventListener.doOnLoad(DefaultLoadEventListener.java:124)
at org.hibernate.event.internal.DefaultLoadEventListener.onLoad(DefaultLoadEventListener.java:89)
at org.hibernate.internal.SessionImpl.fireLoad(SessionImpl.java:1129)
at org.hibernate.internal.SessionImpl.access$2600(SessionImpl.java:164)
at org.hibernate.internal.SessionImpl$IdentifierLoadAccessImpl.load(SessionImpl.java:2689)
at org.hibernate.internal.SessionImpl.get(SessionImpl.java:1082)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1069)
… 16 common frames omitted
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 56) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:4965)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1786)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1039)
at org.hibernate.loader.plan.exec.process.internal.ResultSetProcessorImpl.extractResults(ResultSetProcessorImpl.java:119)
at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:122)
at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:86)
at org.hibernate.loader.entity.plan.AbstractLoadPlanBasedEntityLoader.load(AbstractLoadPlanBasedEntityLoader.java:167)
… 28 common frames omitted
29-May-2017 13:08:47.789 WARN [o.h.e.j.s.SqlExceptionHelper] SQL Error: 1205, SQLState: 40001
29-May-2017 13:08:47.789 ERROR [o.h.e.j.s.SqlExceptionHelper] Transaction (Process ID 59) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
29-May-2017 13:08:47.795 WARN [o.a.e.TrackingEventProcessor] Releasing claim on token and preparing for retry in 2s
29-May-2017 13:08:48.293 WARN [o.h.e.j.s.SqlExceptionHelper] SQL Error: 1205, SQLState: 40001
29-May-2017 13:08:48.294 ERROR [o.h.e.j.s.SqlExceptionHelper] Transaction (Process ID 56) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
29-May-2017 13:08:48.301 WARN [o.a.e.TrackingEventProcessor] Error occurred. Starting retry mode.
javax.persistence.PersistenceException: org.hibernate.exception.LockAcquisitionException: could not load an entity: [org.axonframework.eventhandling.tokenstore.jpa.TokenEntry#component[processorName,segment]{segment=0, processorName=com..read.eventhandlers.run}]
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1692)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1619)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1106)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1046)
at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:298)
at com.sun.proxy.$Proxy149.find(Unknown Source)
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.loadOrCreateToken(JpaTokenStore.java:133)
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.storeToken(JpaTokenStore.java:77)
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$null$1(TrackingEventProcessor.java:179)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:275)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:219)
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:186)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.hibernate.exception.LockAcquisitionException: could not load an entity: [org.axonframework.eventhandling.tokenstore.jpa.TokenEntry#component[processorName,segment]{segment=0, processorName=com.
.read.eventhandlers.run}]
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:123)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.loader.entity.plan.AbstractLoadPlanBasedEntityLoader.load(AbstractLoadPlanBasedEntityLoader.java:177)
at org.hibernate.persister.entity.AbstractEntityPersister.load(AbstractEntityPersister.java:3967)
at org.hibernate.event.internal.DefaultLoadEventListener.loadFromDatasource(DefaultLoadEventListener.java:508)
at org.hibernate.event.internal.DefaultLoadEventListener.doLoad(DefaultLoadEventListener.java:478)
at org.hibernate.event.internal.DefaultLoadEventListener.load(DefaultLoadEventListener.java:219)
at org.hibernate.event.internal.DefaultLoadEventListener.lockAndLoad(DefaultLoadEventListener.java:403)
at org.hibernate.event.internal.DefaultLoadEventListener.doOnLoad(DefaultLoadEventListener.java:124)
at org.hibernate.event.internal.DefaultLoadEventListener.onLoad(DefaultLoadEventListener.java:89)
at org.hibernate.internal.SessionImpl.fireLoad(SessionImpl.java:1129)
at org.hibernate.internal.SessionImpl.access$2600(SessionImpl.java:164)
at org.hibernate.internal.SessionImpl$IdentifierLoadAccessImpl.load(SessionImpl.java:2689)
at org.hibernate.internal.SessionImpl.get(SessionImpl.java:1082)
at org.hibernate.jpa.spi.AbstractEntityManagerImpl.find(AbstractEntityManagerImpl.java:1069)
… 26 common frames omitted
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 56) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:4965)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1786)
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1039)
at org.hibernate.loader.plan.exec.process.internal.ResultSetProcessorImpl.extractResults(ResultSetProcessorImpl.java:119)
at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:122)
at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:86)
at org.hibernate.loader.entity.plan.AbstractLoadPlanBasedEntityLoader.load(AbstractLoadPlanBasedEntityLoader.java:167)
… 38 common frames omitted
29-May-2017 13:08:48.302 WARN [o.a.e.TrackingEventProcessor] Releasing claim on token and preparing for retry in 1s

Hi,

certain databases, and I’m pretty sure MS SQL does the same thing, default to page-level locking. It’s likely that all rows are in the same page, as there’s not that much data in each row.

If you configure the table to use row-level locking instead, you should see a significant increase in processing speed.

Cheers,

Allard