Kafka Rebalance and Axon Message Processing

Hi all,
It’s been awhile but we’re finally getting the hang of Axon. We recently ran into the following trace stack though:

`

org.axonframework.modelling.command.ConcurrencyException: An event for aggregate [101124] at sequence [0] was already inserted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:123) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:281) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:63) ~[axon-eventsourcing-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:218) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$8(AbstractEventBus.java:152) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155) ~[axon-messaging-4.0.jar:4.0]
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899) ~[na:1.8.0_162]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:136) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:258) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:181) ~[axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:661) [axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:771) [axon-messaging-4.0.jar:4.0]
at org.axonframework.eventhandling.TrackingEventProcessor$CountingRunnable.run(TrackingEventProcessor.java:588) [axon-messaging-4.0.jar:4.0]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: javax.persistence.PersistenceException: org.hibernate.exception.ConstraintViolationException: could not execute statement
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:149) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:157) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:164) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1443) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1423) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_162]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_162]
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:305) ~[spring-orm-5.0.9.RELEASE.jar:5.0.9.RELEASE]
at com.sun.proxy.$Proxy123.flush(Unknown Source) ~[na:na]
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:278) ~[axon-eventsourcing-4.0.jar:4.0]
… 19 common frames omitted
Caused by: org.hibernate.exception.ConstraintViolationException: could not execute statement
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:112) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:111) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:97) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:178) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3032) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3547) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:89) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:600) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:474) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:337) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:39) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1437) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
… 26 common frames omitted
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint “uk8s1f994p4la2ipb13me2xqm1w”
Detail: Key (aggregate_identifier, sequence_number)=(101124, 0) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:120) ~[postgresql-42.2.5.jar:42.2.5]
at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61) ~[HikariCP-2.7.9.jar:na]
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java) ~[HikariCP-2.7.9.jar:na]
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:175) ~[hibernate-core-5.2.17.Final.jar:5.2.17.Final]
… 34 common frames omitted

`

This lead us down an investigation where we realized the problem is likely caused by a Kafka rebalance where the consumer offset (of the kafka partition) is smaller than the offset of the last message the client has already processed. This is why we believe we are getting the reprocessing of the above messages (aka processing twice).

How do we reconcile this situation within the AxonFramework? What if any other best practices should we pursue with Kafka as our message bus.

I wanted to give this a few days and hope that we can get some more eyes on the question from the community but the problem still persists. At the moment the only solution is to blow away local data of a service that gets caught in this state. We’d very much like to understand how Axon handles processing of messages that have already been consumed. Do we need to write our consumer handler?

Hi Michael,

I am not entirely sure from the stack trace you’ve shared, but is it true that you are handling a stream of events from Kafka, which you’re then persisting into an Axon Event Store?

Regardless, the constraint is there with a reason (of course).
If you are reading an Aggregate, you want to be sure you receive all the events for a given Aggregate Identifier, in the right order.
Thus, the unique constraint on the aggregate-id sequence-number pair is of the out most importance to be able to Event Source Aggregate correctly.

Cheers,
Steven

Hi Michael,

I just stumbled about your post when I search for something related to what I just found out, before I start opening a new discussion.

Could your error be caused by what I described in this issue: https://github.com/AxonFramework/extension-kafka/issues/25 ?

Regards,
Andre