Hi all,
It’s been awhile but we’re finally getting the hang of Axon. We recently ran into the following trace stack though:
`
org.axonframework.modelling.command.ConcurrencyException: An event for aggregate [101124] at sequence [0] was already inserted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:123) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:281) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:63) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:218) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$8(AbstractEventBus.java:152) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155) ~[axon-messaging-4.0.jar:4.0]
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899) ~[na:1.8.0_162]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:136) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:258) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:181) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:661) [axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:771) [axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$CountingRunnable.run(TrackingEventProcessor.java:588) [axon-messaging-4.0.jar:4.0]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: javax.persistence.PersistenceException: org.hibernate.exception.ConstraintViolationException: could not execute statement
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:149) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:157) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:164) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1443) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1423) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_162]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_162]
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:305) ~[spring-orm-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at com.sun.proxy.$Proxy123.flush(Unknown Source) ~[na:na]
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:278) ~[axon-eventsourcing-4.0.jar:4.0]
… 19 common frames omitted
Caused by: org.hibernate.exception.ConstraintViolationException: could not execute statement
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:112) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:111) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:97) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:178) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3032) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3547) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:89) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:600) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:474) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:337) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:39) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1437) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
… 26 common frames omitted
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint “uk8s1f994p4la2ipb13me2xqm1w”
Detail: Key (aggregate_identifier, sequence_number)=(101124, 0) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:120) ~[postgresql-42.2.5.jar:42.2.5]
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61) ~[HikariCP-2.7.9.jar:na]
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java) ~[HikariCP-2.7.9.jar:na]
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:175) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
… 34 common frames omitted
`
This lead us down an investigation where we realized the problem is likely caused by a Kafka rebalance where the consumer offset (of the kafka partition) is smaller than the offset of the last message the client has already processed. This is why we believe we are getting the reprocessing of the above messages (aka processing twice).
How do we reconcile this situation within the AxonFramework? What if any other best practices should we pursue with Kafka as our message bus.