Handling events published by Saga causing failure

Hi there,

I’ve 2 different microservices(MS) first one creates a user profile and second one manages user’s wallet of that user. Now when the user profile is created I publish an event from my profile management MS that says create wallet. Inside the wallet management MS I’ve an event handler that sends command of CreateWalletSuccessCommand which should publish CreateWalletSuccessEvent. This event will be handled in SagaEventHandler inside profile management MS. But when I try to send command from wallet management I run into an exception

exception Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [1] at sequence [0] could not be persisted

This is the saga UserProfileSaga.java in profile management MS

`
public class UserProfileSaga {

@Autowired
private transient CommandGateway commandGateway;

private Long userId;
private String name;
private String surname;
private String address;
private LocalDate birthday;
private String email;

@StartSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateUserProfileEvent createUserProfileEvent){
this.userId = createUserProfileEvent.getUserId();
this.name = createUserProfileEvent.getName();
this.surname = createUserProfileEvent.getSurname();
this.address = createUserProfileEvent.getAddress();
this.birthday = createUserProfileEvent.getBirthday();
this.email = createUserProfileEvent.getEmail();
System.out.println(“Inside CreateUserProfileEvent SagaEventHandler”);

commandGateway.send(new CreateWalletCommand(createUserProfileEvent.getUserId()));
}

@EndSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateWalletSuccessEvent createWalletSuccessEvent){
System.out.println(“Inside CreateWalletSuccessEvent SagaEventHandler”);
commandGateway.send(new CreateUserProfileSuccessCommand(createWalletSuccessEvent.getUserId()));
}

@EndSaga
@SagaEventHandler(associationProperty=“userId”)
public void on(CreateWalletFailedEvent createWalletFailedEvent){
System.out.println(“Inside CreateWalletFailedEvent SagaEventHandler”);
commandGateway.send(new CreateUserProfileFailedCommand(createWalletFailedEvent.getUserId()));
}

}
`

UserEventHandler.java in wallet management MS

`
@ProcessingGroup(“amqpEvents”)
@Component
public class UserEventHandler {

@Autowired
private WalletService walletService;

@Autowired
private transient CommandGateway commandGateway;

@EventHandler
public void onCreateWalletEvent(CreateWalletEvent event) {
System.out.println("— ####################### onCreateWalletEvent ####################### —");
WalletDTO walletDTO= new WalletDTO();
walletDTO.setAmount(0d);
walletDTO.setUserid(event.getUserId());
WalletDTO wallet = walletService.save(walletDTO);
System.out.println("WalletDTO: “+wallet.toString());
wallet.setId(1L);
System.out.println(”########## WALLET ID ############## "+wallet.getId());
commandGateway.send(new CreateWalletSuccessCommand(wallet.getId()));
}
}
`

WalletAggregate.java in wallet management MS

`
@Aggregate
public class WalletAggregate {

@AggregateIdentifier
private Long walletId;
private Double amount;
private Long userId;

@CommandHandler
public WalletAggregate(CreateWalletCommand command){
System.out.println("— Command Handler Start : WalletCreatedCommand —");
AggregateLifecycle.apply(new CreateWalletEvent(command.getWalletId(),command.getAmount(),command.getUserId()));
System.out.println("— Command Handler End : WalletCreatedCommand —");
}

@EventSourcingHandler
public void handle(CreateWalletEvent event) {
System.out.println("— Event Sourcing Handler Start : WalletCreatedEvent —");
this.walletId = event.getWalletId();
this.amount = event.getAmount();
this.userId = event.getUserId();
System.out.println("— Event Sourcing Handler End : WalletCreatedEvent —");
}

@CommandHandler
public WalletAggregate(CreateWalletSuccessCommand createWalletSuccessCommand){
System.out.println("################# Inside CreateWalletSuccessCommand handler #######################");
AggregateLifecycle.apply(new CreateWalletSuccessEvent(createWalletSuccessCommand.getWalletId()));
}

@EventSourcingHandler
public void handle(CreateWalletSuccessEvent event){
System.out.println(“Inside CreateWalletSuccessEvent”);
this.walletId = event.getWalletId();
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public Long getUserId() {
return userId;
}

public void setUserId(Long userId) {
this.userId = userId;
}

@Override
public String toString() {
return “WalletAggregate [amount=” + amount + “, userId=” + userId + “]”;
}

}
`

Executing this code gives me following stack trace :

`
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1279/1700888746 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1280/259027639 for phase COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1281/1300180978 for phase AFTER_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1282/1643205257 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.LockingRepository$$Lambda$1283/103427965 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.AbstractRepository$$Lambda$1289/246646516 for phase ROLLBACK
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.commandhandling.model.AbstractRepository$$Lambda$1291/250995139 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.unitofwork.AbstractUnitOfWork : Committing Unit Of Work
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1279/1700888746 for phase PREPARE_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1280/259027639 for phase COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1281/1300180978 for phase AFTER_COMMIT
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1282/1643205257 for phase CLEANUP
2018-08-07 17:48:58.803 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.eventhandling.AbstractEventBus$$Lambda$1292/1702196973 for phase COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.messaging.unitofwork.AbstractUnitOfWork$$Lambda$1293/2003032209 for phase AFTER_COMMIT
2018-08-07 17:48:58.804 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Adding handler org.axonframework.messaging.unitofwork.AbstractUnitOfWork$$Lambda$1294/1497591203 for phase ROLLBACK
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.unitofwork.AbstractUnitOfWork : Committing Unit Of Work
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase PREPARE_COMMIT
2018-08-07 17:48:58.805 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase ROLLBACK
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase ROLLBACK
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLEANUP
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLEANUP
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLOSED
2018-08-07 17:48:58.806 DEBUG 16460 — [cTaskExecutor-1] o.a.m.u.MessageProcessingContext : Notifying handlers for phase CLOSED
2018-08-07 17:48:58.806 WARN 16460 — [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method ‘public void com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception’ threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [1] at sequence [0] could not be persisted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:112)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:223)
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:85)
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:210)
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$4(AbstractEventBus.java:145)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:142)
at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:32)
at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:135)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.lambda$onMessage$1(SpringAMQPMessageSource.java:90)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.onMessage(SpringAMQPMessageSource.java:90)
at com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(AxonConfiguration.java:67)
at sun.reflect.GeneratedMethodAccessor319.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)
… 10 common frames omitted
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:289)
at com.sun.proxy.$Proxy268.persist(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:218)
… 37 common frames omitted
`

What am I missing here also is it a correct way to handle such transaction failure in case I need to publish an WalletCreationFailedEvent which the saga in profile management MS will listen to in order to rollback user creation?

Thanks and Regards,
Malay M

Hi,

the problem is that there is no running transaction:
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call

Do you have a transaction manager configured in your application? If you define a MessageListenerContainer(Factory), make sure it has a TransactionManager configured. Otherwise, putting @Transactional on the method annotated with @RabbitListener should also work. Although I’m not sure if messages of failed transactions are then NACKd/requeued.

Are you configuring any Axon components “manually”, or letting Spring Boot Autoconfiguration do that?

Kind regards,

Allard

Hey Allard,

Thanks for the response, putting @Transactional on @RabbitListener worked. But instead of listening event CreateWalletSuccessEvent/CreateWalletFailedEvent in @SagaEventHandler to end the saga it ends up listening in regular @EventHandler, which thus does not end my saga. So is this the right approach to handle such Success/Failure or am I misinterpreting the usage of Saga here?

Thanks and Regards,
Malay M

Hi Malay,

The successfully end a Saga instance, you must either use the SagaLifecycle#end() function or the @EndSaga annotation.

Both of these should be triggered from a @SagaEventHandler, as otherwise Axon does not know which Saga instance its lifecycle should be ended.
Thus your point that an @EventHandler annotated is called, but not the @SagaEventHandler, as an issue in this scenario.

A reason why the @SagaEventHandler isn’t handling a given event, might be because the associationProperty, a mandatory field of the @SagaEventHandler, does not match with any existing Saga instance.

Did you correctly associate a specific identifier in the event to the Saga, by for example using the SagaLifecycle#associateWith() function?
If not, that might be the culprit.

As you’re receiving your events from an AMQP queue, another problem might arise if you haven’t correctly set the ‘message-source’ of that Saga its Event Processor.

This wholly depends on how you’ve configured your Sagas though.

Let’s try to figure this out together :slight_smile:

Cheers,
Steven

Hi Steven,

You are right that’s what was happening and actually it was combination of issues the first one being SagaConfiguration was improper and then there was associationProperty which was not matching, actually one of our previous discussions that happened on this thread cleared lot of things and helped us develop a Saga without any more hiccups.

Thanks and Cheers :slight_smile:
Malay