Duplicate key value violation

We are using DisruptorCommandBus defined as

`
DisruptorCommandBus commandBus =
DisruptorCommandBus.builder()
.transactionManager(new SpringTransactionManager(transactionManager))
.messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus.class, “commandBus”))
.invokerThreadCount(1)
.publisherThreadCount(2)
.dispatchInterceptors(Collections.singletonList(new BeanValidationInterceptor<>()))
.build();
commandBus.registerHandlerInterceptor(new CorrelationDataInterceptor<>(axonConfiguration.correlationDataProviders()));

`

with this repo declaration

`
@Bean
@Qualifier(“paymentAggregateRepository”)
public Repository getPaymentAggregateRepo(ParameterResolverFactory parameterResolverFactory) {
return disruptorCommandBus.createRepository(eventStore, new GenericAggregateFactory<>(PaymentAggregate.class), parameterResolverFactory);
}

`

and this TM

@ConditionalOnMissingBean(type = "JpaTransactionManager") @Qualifier("transactionManager") JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactory); return transactionManager; }

When we are executing commands running two VM’s we are from time to time receiving ConcurrencyException as below. Do you know how to solve this issue? I have checked earlier similar threads where one of the suggestions was to include

You can disable open session in view by putting this in application.properties

spring.jpa.open-in-view=false

However, this did not help.

Request processing failed; nested exception is org.axonframework.modelling.command.ConcurrencyException: An event for aggregate [593ae98c-1c25-46cd-a7ee-28b8bb88af24] at sequence [5] was already inserted] with root cause" stack_trace: "org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "uk8s1f994p4la2ipb13me2xqm1w" Detail: Key (aggregate_identifier, sequence_number)=(593ae98c-1c25-46cd-a7ee-28b8bb88af24, 5) already exists. at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143) at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:120) at sun.reflect.GeneratedMethodAccessor145.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.tomcat.jdbc.pool.StatementFacade$StatementProxy.invoke(StatementFacade.java:114) at com.sun.proxy.$Proxy230.executeUpdate(Unknown Source) at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:175) at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3176) at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3690) at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:90) at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604) at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:478) at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:356) at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:39) at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1454) at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1440) at sun.reflect.GeneratedMethodAccessor384.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:308) at com.sun.proxy.$Proxy168.flush(Unknown Source) at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.lambda$appendEvents$6(JpaEventStorageEngine.java:283) at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47) at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:279) at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:94) at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:63) at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:218) at org.axonframework.eventhandling.AbstractEventBus.lambda$null$8(AbstractEventBus.java:152) at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71) at org.axonframework.disruptor.commandhandling.DisruptorUnitOfWork.notifyHandlers(DisruptorUnitOfWork.java:92) at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222) at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83) at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71) at org.axonframework.disruptor.commandhandling.EventPublisher.performCommit(EventPublisher.java:157) at org.axonframework.disruptor.commandhandling.EventPublisher.processPublication(EventPublisher.java:119) at org.axonframework.disruptor.commandhandling.EventPublisher.onEvent(EventPublisher.java:79) at org.axonframework.disruptor.commandhandling.EventPublisher.onEvent(EventPublisher.java:41) at com.lmax.disruptor.BatchEventProcessor.processEvents(BatchEventProcessor.java:168) at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:125) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Hi,

how many nodes do you use? If you are using more than one node, do they share the same datastore to store the aggregate?

Kind regards,

Simon

Hi Simon,

We are using multiple nodes (VM’s) and they share the same evenstore to store the aggregates. Number of nodes are between 2 and 10.

Br Thorbjörn

Hi Thorbjörn, Simon,

When it comes to a distributed set up, we typically recommend that you configure a distributed command bus to delegate a command to a single destination at all times.
This idea of persistently routing a command to the same node is in line with the fact you can only have a single command handler for a given command.

In specify a distributed command bus solution, you will be ensured the same node will handled a given command for an aggregate at all times.
As such, no two (or more) nodes can concurrently handle a command for the same aggregate.
It’s this scenario that causes the ConcurrencyException in your set up.

As for choosing a distributed command bus, you have three options.

  1. Axon Server
  2. JGroups DistributedCommandBus implementation
  3. Spring Cloud DistributedCommandBus implementation
    The simplest set up would be Axon Server, as this ensure you are not required to configure a JGroups or Spring Cloud solution.

Hope this helps!

Cheers,
Steven

Hi Steven,

Thanks for the reply. Regarding your options, we’re using Spring Boot v2.1.5 and Axon v4.1.2. The application runs on a GKE cluster in GCP with 4 pods or more. Is your recommendation the same with this setup?

Br Thorbjörn

Hi Thorbjörn,

With any distributed set up would I recommend to use a distributed command bus mechanism.
Without that in place, you cannot be guaranteed the single destination requirement of commands.
Thus, without a distributed command bus solution, you will have to cover issues like this yourself which the framework already aims to solve for you through the distributed bus.

So in short, yes I would! :slight_smile:

As said setting up Axon Server would make the configuration terribly easy for you.
Spring Cloud and JGroups would require some configuration.

Hope this clarifies things Thorbjörn!

Cheers,
Steven