Saga with AMQP - lost eventhandler invocation in case of quick subsequent receive of messages

Hello,

I am using saga that issues two commands to separate microservices. Both take about the same time so the event response arrive in the AMQP queue around the same time. My handler method

@Bean
public SpringAMQPMessageSource orderEvents(Serializer serializer) {
   return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

      @RabbitListener(queues = "OrderQueue")
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
         //necessary because message is not delivered to saga otherwise
         Thread.sleep(1000);
         super.onMessage(message, channel);
      }
   };
}

The messages are delivered through AMQP - the method onMessage is invoked but the processing in DefaultAMQPMessageConverter prohibits the event delivery of both events to the saga (in some rare cases the latter is delivered).

The event processor registration:

//the manual registration of saga event subscription because of processing events from AMQP queue
@Autowired
@SuppressWarnings("unchecked")
public void createSaga(SagaStore sagaStore, SpringAMQPMessageSource springAMQPMessageSource, ResourceInjector resourceInjector, ParameterResolverFactory parameterResolverFactory, TransactionManager transactionManager)
{
   String simpleName = OrderManagementSaga.class.getSimpleName();

   AnnotatedSagaRepository sagaRepository = new AnnotatedSagaRepository<>(OrderManagementSaga.class, sagaStore, resourceInjector, parameterResolverFactory);
   AnnotatedSagaManager<OrderManagementSaga> sagaManager = new AnnotatedSagaManager<>(OrderManagementSaga.class, sagaRepository, parameterResolverFactory);

   this.subscribingEventProcessor = new SubscribingEventProcessor(simpleName + "Processor", sagaManager, springAMQPMessageSource);
   this.subscribingEventProcessor.registerInterceptor(new TransactionManagingInterceptor<>(transactionManager));
   this.subscribingEventProcessor.start();
}

Note that this happens only without timeout present in the onMessage.

Thanks!

Attaching the source code [1]. To reproduce the problem you need to remove the timeout from https://github.com/xstefank/axon-service/blob/master/order-service/src/main/java/org/learn/axonframework/orderservice/OrderServiceApplication.java#L90 and then follow the README instructions.

[1] https://github.com/xstefank/axon-service

Regards,
Martin

Hi Martin,

to get a better understanding of the problem, which are the two events that are published to the Saga (almost) simultaneously?

Allard

Hello Allard,

thanks for the fast response.

The events are ShipmentPreparedEvent and InvoicePreparedEvent. https://github.com/xstefank/axon-service/blob/master/service-model/src/main/java/org/learn/axonframework/coreapi/OrderAPI.kt

Regards,
Martin

Hi Martin,

I did some quick checks in the code, and can’t see any reason why these messages shouldn’t be delivered. That is, at least when the messages are delivered to the same node. Do you perhaps have multiple nodes (or consumers) reading messages from the queue.

If I have a bit more time on my hands, I will try to run your application to see what’s happening.

Allard

Hello Allard,

thanks for looking into this. There should be only one service assuming run with provided docker-compose. Please note that events are delivered to AMQP handler, I checked that with log messages.

Martin

Hi Martin,

I managed to find the source of the problem. The good news is: it’s not an issue in Axon. The bad news is: there’s a configuration error in your project and something that could be improved in Axon as well.

Basically, you had 2 simultaneous configurations for the OrderManagementSaga. One configuration would read from the queue, while the other one would process events directly in the publishing thread. Because of the way you set the AMQP reading configuration, it would create a new SagaRepository. There are synchronization mechanisms in the SagaRepository, but they only work when multiple actions hit the same instance. If they hit different instances, which was the case here, these mechanisms are circumvented. By adding the 1 second delay, you gave the other SagaRepository enough time to persist its changes, so that the AMQP one could read its instances from the database.

I have changed your configuration to contain the following:

@Bean
public SagaConfiguration<OrderManagementSaga> orderManagementSagaConfiguration(SpringAMQPMessageSource springAMQPMessageSource,
                                                            PlatformTransactionManager txManager) {
   return SagaConfiguration.subscribingSagaManager(OrderManagementSaga.class, c -> springAMQPMessageSource)

                     .configureTransactionManager(c -> new SpringTransactionManager(txManager));
}

Then, you also need to make sure the OrderEvents exchange is bound to the OrderQueue, otherwise your Saga doesn’t receive events.

@Bean
public Exchange orderExchange() {
   return ExchangeBuilder.fanoutExchange("OrderEvents").durable(true).build();
}
@Bean
public Queue orderQueue() {
   return QueueBuilder.durable("OrderQueue").build();
}

@Bean
public Binding orderBinding() {
   return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("*").noargs();
}

@Autowired
public void config(AmqpAdmin amqpAdmin) {
   amqpAdmin.declareQueue(orderQueue());
   amqpAdmin.declareExchange(orderExchange());
   amqpAdmin.declareBinding(orderBinding());
}

And lastly, the SpringAMQPMessageSource needed an @Transactional annotation to ensure a transaction is started.

@Bean
public SpringAMQPMessageSource orderEvents(Serializer serializer) {
   return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

      @Transactional
      @RabbitListener(queues = "OrderQueue")
      @Override
      public void onMessage(Message message, Channel channel) throws Exception {
         //necessary because message is not delivered to saga otherwise
         super.onMessage(message, channel);
      }
   };
}

This is something that Axon should have done out of the box, actually. I’ll fix that in 3.2.1

After making these changes, the Saga worked like a charm, even without the timed wait.

Cheers,

Allard

Hello Allard,

thanks a lot! I’ve included the changes and can confirm it works :slight_smile:

Martin

Hi Martin,

That’s good to hear.

This message routing configuration can be a hard one to tackle. That’s exactly why we’re working on AxonHub (currently in public beta). We’ve recently given a webinar about it (see https://www.youtube.com/watch?v=2FjecW_Mm6I).

Cheers,

Allard