Hi there,
I’ve 2 different microservices(MS) first one creates a user profile and second one manages user’s wallet of that user. Now when the user profile is created I publish an event from my profile management MS that says create wallet. Inside the wallet management MS I’ve an event handler that sends command of CreateWalletSuccessCommand which should publish CreateWalletSuccessEvent. This event will be handled in SagaEventHandler inside profile management MS. But when I try to send command from wallet management I run into an exception
exception Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [1] at sequence [0] could not be persisted
This is the saga UserProfileSaga.java in profile management MS
`
public class UserProfileSaga {
@Autowired
private transient CommandGateway commandGateway;
private Long userId;
private String name;
private String surname;
private String address;
private LocalDate birthday;
private String email;
@StartSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateUserProfileEvent createUserProfileEvent){
this.userId = createUserProfileEvent.getUserId();
this.name = createUserProfileEvent.getName();
this.surname = createUserProfileEvent.getSurname();
this.address = createUserProfileEvent.getAddress();
this.birthday = createUserProfileEvent.getBirthday();
this.email = createUserProfileEvent.getEmail();
System.out.println(“Inside CreateUserProfileEvent SagaEventHandler”);
commandGateway.send(new CreateWalletCommand(createUserProfileEvent.getUserId()));
}
@EndSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateWalletSuccessEvent createWalletSuccessEvent){
System.out.println(“Inside CreateWalletSuccessEvent SagaEventHandler”);
commandGateway.send(new CreateUserProfileSuccessCommand(createWalletSuccessEvent.getUserId()));
}
@EndSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateWalletFailedEvent createWalletFailedEvent){
System.out.println(“Inside CreateWalletFailedEvent SagaEventHandler”);
commandGateway.send(new CreateUserProfileFailedCommand(createWalletFailedEvent.getUserId()));
}
}
`
UserEventHandler.java in wallet management MS
`
@ProcessingGroup(“amqpEvents”)
@Component
public class UserEventHandler {
@Autowired
private WalletService walletService;
@Autowired
private transient CommandGateway commandGateway;
@EventHandler
public void onCreateWalletEvent(CreateWalletEvent event) {
System.out.println("— ####################### onCreateWalletEvent ####################### —");
WalletDTO walletDTO= new WalletDTO();
walletDTO.setAmount(0d);
walletDTO.setUserid(event.getUserId());
WalletDTO wallet = walletService.save(walletDTO);
System.out.println("WalletDTO: “+wallet.toString());
wallet.setId(1L);
System.out.println(”########## WALLET ID ############## "+wallet.getId());
commandGateway.send(new CreateWalletSuccessCommand(wallet.getId()));
}
}
`
WalletAggregate.java in wallet management MS
`
@Aggregate
public class WalletAggregate {
@AggregateIdentifier
private Long walletId;
private Double amount;
private Long userId;
@CommandHandler
public WalletAggregate(CreateWalletCommand command){
System.out.println("— Command Handler Start : WalletCreatedCommand —");
AggregateLifecycle.apply(new CreateWalletEvent(command.getWalletId(),command.getAmount(),command.getUserId()));
System.out.println("— Command Handler End : WalletCreatedCommand —");
}
@EventSourcingHandler
public void handle(CreateWalletEvent event) {
System.out.println("— Event Sourcing Handler Start : WalletCreatedEvent —");
this.walletId = event.getWalletId();
this.amount = event.getAmount();
this.userId = event.getUserId();
System.out.println("— Event Sourcing Handler End : WalletCreatedEvent —");
}
@CommandHandler
public WalletAggregate(CreateWalletSuccessCommand createWalletSuccessCommand){
System.out.println("################# Inside CreateWalletSuccessCommand handler #######################");
AggregateLifecycle.apply(new CreateWalletSuccessEvent(createWalletSuccessCommand.getWalletId()));
}
@EventSourcingHandler
public void handle(CreateWalletSuccessEvent event){
System.out.println(“Inside CreateWalletSuccessEvent”);
this.walletId = event.getWalletId();
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return “WalletAggregate [amount=” + amount + “, userId=” + userId + “]”;
}
}
`
Executing this code gives me following stack trace :
`
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1279/1700888746 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1280/259027639 for phase COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1281/1300180978 for phase AFTER_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1282/1643205257 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.LockingRepository$$Lambda$1283/103427965 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.AbstractRepository$$Lambda$1289/246646516 for phase ROLLBACK
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.AbstractRepository$$Lambda$1291/250995139 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.unitofwork.AbstractUnitOfWork : Committing Unit Of Work
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1279/1700888746 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1280/259027639 for phase COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1281/1300180978 for phase AFTER_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1282/1643205257 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1292/1702196973 for phase COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.messaging.unitofwork.AbstractUnitOfWork$$Lambda$1293/2003032209 for phase AFTER_COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.messaging.unitofwork.AbstractUnitOfWork$$Lambda$1294/1497591203 for phase ROLLBACK
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.unitofwork.AbstractUnitOfWork : Committing Unit Of Work
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase PREPARE_COMMIT
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase ROLLBACK
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase ROLLBACK
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLEANUP
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLEANUP
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLOSED
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLOSED
2018-08-07 17:48:58.806 WARN 16460 — [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method ‘public void com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception’ threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [1] at sequence [0] could not be persisted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:112)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:223)
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:85)
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:210)
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$4(AbstractEventBus.java:145)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:142)
at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:32)
at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:135)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.lambda$onMessage$1(SpringAMQPMessageSource.java:90)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.onMessage(SpringAMQPMessageSource.java:90)
at com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(AxonConfiguration.java:67)
at sun.reflect.GeneratedMethodAccessor319.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)
… 10 common frames omitted
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:289)
at com.sun.proxy.$Proxy268.persist(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:218)
… 37 common frames omitted
`
What am I missing here also is it a correct way to handle such transaction failure in case I need to publish an WalletCreationFailedEvent which the saga in profile management MS will listen to in order to rollback user creation?
Thanks and Regards,
Malay M