Sagas using RabbitMQ in Axon 4.1

Hi,
I’m trying to implement a saga using Axon 4.1. I have three microservices, developed as Spring Boot applications: orders, stocks and payments.
I’m using Consul for distributing the command bus and RabbitMQ for distributing the event bus.
My order service implements a Saga as follows:

@Saga
public class ProcessPurchaseSaga implements Serializable {
    private static final Logger logger = LoggerFactory.getLogger(ProcessPurchaseSaga.class);

    private transient CommandGateway commandGateway;

    private ProcessPurchaseStartedEvent purchase;

    /**
     * This is called
     */ 
    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void handle(ProcessPurchaseStartedEvent event) {
        purchase = event;

        SagaLifecycle.associateWith("accountNumber", purchase.accountNumber);

        // TODO: Fetch the unit price from stocks service
        final double unitPrice = 20;

        // Trigger the payment (could we call the REST API instead?)
        commandGateway.send(
            new DebitMoneyCommand(
                purchase.accountNumber,
                purchase.quantity * unitPrice,
                "USD"
            )
        );
    }

    /**
     * Never called!
     */
    @EndSaga
    @SagaEventHandler(associationProperty = "id", keyName = "accountNumber")
    public void handle(MoneyDebitedEvent event) {
        logger.debug("Money {} paid for {}", event.debitAmount, event.id);
        commandGateway.send(
            new CreateInvoiceCommand(
                UUID.randomUUID().toString(),
                new java.util.Date(),
                purchase.accountNumber,
                purchase.itemNumber,
                purchase.quantity,
                event.debitAmount
            )
        );
    }

    @Autowired
    public void setCommandGateway(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }
}

Each microservice has the same configuration for AMQP:

@Configuration
@Profile("rabbitmq")
public class AMQPConfig {
    @Bean
    public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
        return DefaultAMQPMessageConverter.builder().serializer(serializer).build();
    }

    @Bean
    public SpringAMQPMessageSource myQueueMessageSource(AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "#{eventBusQueue.name}")
            @Override
            public void onMessage(Message message, Channel channel) {

Hi,

your design is correct. The only thing missing is the configuration that tells Axon you want to use the “myQueueMessageSource” as the source of Events for your Saga. By default, Axon will try to use tracking processors that read the events from the Event Store.

You can override that by adding this to your AMQPConfig (or any other configuration class):

@Autowired

public void config(EventProcessingConfigurer config, SpringAMQPMessageSource myQueueMessageSource) {
config.registerSubscribingEventProcessor(“ProcessPurchaseSagaProcessor”, c -> myQueueMessageSource);
}

Note that the “ProcessPurchaseSagaProcessor” name is derived from the Saga’s class name. You can override this, by adding an @ProcessingGroup annotation on your Saga. In that case, the name you provide in the annotation should match the name given in the “registerSubscribingEventProcessor” method.

Hope this helps.

Thanks, your snippet indeed resolved the problem. I just needed to add a @Transactional annotation to the rabbit listener, in order to avoid the error below:

`
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method ‘public void com.example.saga.axon.orderservice.config.AMQPConfig$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)’ threw exception

Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call

`

Also, I forgot to mention another bunch of properties that I have (picked from https://docs.axoniq.io/reference-guide/extensions/spring-amqp#reading-events-from-an-amqp-queue):

axon:
  amqp:
    # Forwards Events to AMQP Channel
    exchange: eventBusExchange

  eventhandling:
    processors:
      name:
        source: myQueueMessageSource

which were already in place before integrating your suggestion.I’m a bit confused here, what’s the difference between the eventhandling block and the Java config?

Thanks

Hi Danilo,

the eventhandling block in the properties is effectively the same as providing configuration using the Java API. However, if you provide both, the Java API takes precedence. The latter is also more powerful: you can tune a lot more settings using the API than just the properties files. Some just have a preference for one or the other.

By the way, the @Transactional is indeed necessary, for now. We want to make the TransactionManager configurable on the SubscribingEventProcessor as well, as it can’t always rely on the messages being delivered on a Thread that already has an active transaction. Here is the issue to track this feature: https://github.com/AxonFramework/AxonFramework/issues/1074

Cheers,