Hi Team,
I am trying to distribute the command bus using the spring-cloud extension. Here is my setup.
I am using Axon Framework maven BOM with following version
mavenBom 'org.axonframework:axon-bom:4.9.2'
mavenBom 'com.google.cloud:spring-cloud-gcp-dependencies:5.0.0'
mavenBom 'org.spring framework.cloud:spring-cloud-starter-parent:2023.0.0'
I am using Google AlloyDB as an event store and using SimpleCommandBus the default implementation with distributed command bus by adding the following dependency.
implementation 'org.axonframework.extensions.springcloud:axon-springcloud-spring-boot-starter'
Also I am configuring the command bus using the default properties given in the example here:
eureka.client.fetch-registry=true
axon.distributed.load-factor=500
axon.distributed.spring-cloud.mode=rest
axon.distributed.enabled=true
axon.distributed.spring-cloud.fallback-to-http-get=true
eureka.client.service-url.defaultZone=http://eureka-service:8761/eureka/
axon.distributed.spring-cloud.enable-ignore-listing=true
axon.distributed.spring-cloud.enable-accept-all-commands=true
spring.jpa.open-in-view=false
I have deployed the service on GKE with 4 Pods. My eureka server is running fine in the same GKE in a separate pod and successfully registers all 4 service instances and processes things fine except for one issue. When I ingest 1000 commands to update a single aggregate, around 90% of the commands are routed properly to a single pod out of 4 but 10% are being routed to another pod and I get the following stack trace with error.
java.util.concurrent.CompletionException: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [525929398] at sequence [601] was already inserted
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.thenApply(Unknown Source)
at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.lambda$messageReceiver$2(PurchaseOrderCreatedEventSubscriber.java:41)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.messageReceiver(PurchaseOrderCreatedEventSubscriber.java:30)
at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.integration.handler.support.IntegrationInvocableHandlerMethod.doInvoke(IntegrationInvocableHandlerMethod.java:45)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1086)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:569)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:482)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:360)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.observe(Observation.java:499)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:146)
at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:172)
at com.google.cloud.pubsub.v1.MessageDispatcher$3.run(MessageDispatcher.java:502)
at com.google.cloud.pubsub.v1.SequentialExecutorService$AutoExecutor$1.run(SequentialExecutorService.java:134)
at com.google.cloud.pubsub.v1.SequentialExecutorService$SequentialExecutor$1.run(SequentialExecutorService.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [525929398] at sequence [601] was already inserted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:127)
at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.lambda$null$4(JdbcEventStorageEngine.java:413)
at org.axonframework.common.jdbc.JdbcUtils.executeBatch(JdbcUtils.java:170)
at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.lambda$appendEvents$5(JdbcEventStorageEngine.java:410)
at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47)
at org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine.appendEvents(JdbcEventStorageEngine.java:409)
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:105)
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:66)
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:256)
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$11(AbstractEventBus.java:170)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:72)
at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.notifyHandlers(DefaultUnitOfWork.java:109)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:236)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:87)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:75)
at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.executeWithResult(DefaultUnitOfWork.java:95)
at org.axonframework.commandhandling.SimpleCommandBus.lambda$handle$2(SimpleCommandBus.java:200)
at org.axonframework.tracing.Span.runSupplier(Span.java:163)
at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:191)
at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:165)
at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch$1(SimpleCommandBus.java:131)
at org.axonframework.tracing.Span.run(Span.java:101)
at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
at org.axonframework.extensions.springcloud.commandhandling.SpringHttpCommandBusConnector.send(SpringHttpCommandBusConnector.java:154)
at org.axonframework.commandhandling.distributed.DistributedCommandBus.dispatch(DistributedCommandBus.java:170)
at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:138)
at com.example.whseinventory.inbound.PurchaseOrderCreatedEventSubscriber.lambda$messageReceiver$2(PurchaseOrderCreatedEventSubscriber.java:40)
... 44 common frames omitted
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO DomainEventEntry (eventIdentifier, aggregateIdentifier, sequenceNumber, type, timeStamp, payloadType, payloadRevision, payload, metaData) VALUES ()) was aborted: ERROR: duplicate key value violates unique constraint "domainevententry_aggregateidentifier_sequencenumber_key"
Detail: Key (aggregateidentifier, sequencenumber)=(525929398, 601) already exists. Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:573)
at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1677)
at com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java)
at org.axonframework.common.jdbc.JdbcUtils.executeBatch(JdbcUtils.java:168)
... 70 common frames omitted
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "domainevententry_aggregateidentifier_sequencenumber_key"
Detail: Key (aggregateidentifier, sequencenumber)=(525929398, 601) already exists.
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2712)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2400)
... 77 common frames omitted
In addition to this, I also get a lot of the following info intermittently on arbitrary pods
Failed to receive the capabilities from ServiceInstance [PURCHASE ORDERS] under host [*] and port [8080]. Will temporarily set this instance to deny all incoming messages
I am using the default Routing strategy and my commands are annotated with @TargetAggregateIdentifier
Can someone please suggest how to make sure that commands are routed to a single pod if the update is for a single aggregate ID? What config I am missing?