DistributedCommandBus with Spring Cloud Eureka

Hi Team,

I am trying to distribute the command bus using the spring-cloud extension. Here is my setup.

I am using Axon Framework maven BOM with following version

mavenBom 'org.axonframework:axon-bom:4.9.2'
        mavenBom 'com.google.cloud:spring-cloud-gcp-dependencies:5.0.0'
        mavenBom 'org.spring framework.cloud:spring-cloud-starter-parent:2023.0.0'

I am using Google AlloyDB as an event store and using SimpleCommandBus the default implementation with distributed command bus by adding the following dependency.

 implementation 'org.axonframework.extensions.springcloud:axon-springcloud-spring-boot-starter'

Also I am configuring the command bus using the default properties given in the example here:

eureka.client.fetch-registry=true
axon.distributed.load-factor=500
axon.distributed.spring-cloud.mode=rest
axon.distributed.enabled=true
axon.distributed.spring-cloud.fallback-to-http-get=true
eureka.client.service-url.defaultZone=http://eureka-service:8761/eureka/
axon.distributed.spring-cloud.enable-ignore-listing=true
axon.distributed.spring-cloud.enable-accept-all-commands=true
spring.jpa.open-in-view=false

I have deployed the service on GKE with 4 Pods. My eureka server is running fine in the same GKE in a separate pod and successfully registers all 4 service instances and processes things fine except for one issue. When I ingest 1000 commands to update a single aggregate, around 90% of the commands are routed properly to a single pod out of 4 but 10% are being routed to another pod and I get the following stack trace with error.

java.util.concurrent.CompletionException: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [525929398] at sequence [601] was already inserted
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.thenApply(Unknown Source)
	at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.lambda$messageReceiver$2(PurchaseOrderCreatedEventSubscriber.java:41)
	at java.base/java.lang.Iterable.forEach(Unknown Source)
	at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.messageReceiver(PurchaseOrderCreatedEventSubscriber.java:30)
	at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
	at org.springframework.integration.handler.support.IntegrationInvocableHandlerMethod.doInvoke(IntegrationInvocableHandlerMethod.java:45)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1086)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:569)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:482)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:360)
	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
	at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
	at io.micrometer.observation.Observation.observe(Observation.java:499)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
	at com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:146)
	at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:172)
	at com.google.cloud.pubsub.v1.MessageDispatcher$3.run(MessageDispatcher.java:502)
	at com.google.cloud.pubsub.v1.SequentialExecutorService$AutoExecutor$1.run(SequentialExecutorService.java:134)
	at com.google.cloud.pubsub.v1.SequentialExecutorService$SequentialExecutor$1.run(SequentialExecutorService.java:112)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [525929398] at sequence [601] was already inserted
	at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:127)
	at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.lambda$null$4(JdbcEventStorageEngine.java:413)
	at org.axonframework.common.jdbc.JdbcUtils.executeBatch(JdbcUtils.java:170)
	at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.lambda$appendEvents$5(JdbcEventStorageEngine.java:410)
	at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47)
	at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.appendEvents(JdbcEventStorageEngine.java:409)
	at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:105)
	at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:66)
	at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:256)
	at org.axonframework.eventhandling.AbstractEventBus.lambda$null$11(AbstractEventBus.java:170)
	at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:72)
	at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.notifyHandlers(DefaultUnitOfWork.java:109)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:236)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:87)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:75)
	at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.executeWithResult(DefaultUnitOfWork.java:95)
	at org.axonframework.commandhandling.SimpleCommandBus.lambda$handle$2(SimpleCommandBus.java:200)
	at org.axonframework.tracing.Span.runSupplier(Span.java:163)
	at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:191)
	at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:165)
	at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch$1(SimpleCommandBus.java:131)
	at org.axonframework.tracing.Span.run(Span.java:101)
	at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
	at org.axonframework.extensions.springcloud.commandhandling.SpringHttpCommandBusConnector.send(SpringHttpCommandBusConnector.java:154)
	at org.axonframework.commandhandling.distributed.DistributedCommandBus.dispatch(DistributedCommandBus.java:170)
	at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
	at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
	at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:138)
	at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.lambda$messageReceiver$2(PurchaseOrderCreatedEventSubscriber.java:40)
	... 44 common frames omitted
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO DomainEventEntry (eventIdentifier, aggregateIdentifier, sequenceNumber, type, timeStamp, payloadType, payloadRevision, payload, metaData) VALUES ()) was aborted: ERROR: duplicate key value violates unique constraint "domainevententry_aggregateidentifier_sequencenumber_key"
  Detail: Key (aggregateidentifier, sequencenumber)=(525929398, 601) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:573)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1677)
	at com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127)
	at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java)
	at org.axonframework.common.jdbc.JdbcUtils.executeBatch(JdbcUtils.java:168)
	... 70 common frames omitted
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "domainevententry_aggregateidentifier_sequencenumber_key"
  Detail: Key (aggregateidentifier, sequencenumber)=(525929398, 601) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2712)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2400)
	... 77 common frames omitted

In addition to this, I also get a lot of the following info intermittently on arbitrary pods

Failed to receive the capabilities from ServiceInstance [PURCHASE ORDERS] under host [*] and port [8080]. Will temporarily set this instance to deny all incoming messages

I am using the default Routing strategy and my commands are annotated with @TargetAggregateIdentifier
Can someone please suggest how to make sure that commands are routed to a single pod if the update is for a single aggregate ID? What config I am missing?

Here is a workaround I got a hint from one of the earlier threads on this forum Sequence number duplication - #3 by Franco_Torriani. I tried the following config option (eureka.instance.preferIpAddress=true) and recommended configurations, and it worked for me on multiple pods. Here is my full configuration which is working with multiple Pods with spring-cloud extension as distributed command bus.

eureka.client.fetch-registry=true
# Defines the load factor used for this segment. Defaults to 100
axon.distributed.load-factor=500
# Defines the CapabilityDiscoveryMode used. Defaults to REST
axon.distributed.spring-cloud.mode=rest
# Defines the endpoint used to retrieve member capabilities from. Defaults to "/member-capabilities"
axon.distributed.enabled=true
axon.distributed.spring-cloud.fallback-to-http-get=true
eureka.client.service-url.defaultZone=http://my-eureka-service-host:8761/eureka/
axon.distributed.spring-cloud.enable-ignore-listing=true
# Defines whether the CapabilityDiscoveryMode should be decorated to accept all types of commands
axon.distributed.spring-cloud.enable-accept-all-commands=true
#Never remove this as this is crucial to keep networking alive between nodes
eureka.instance.preferIpAddress=true