Hello,
I am using saga that issues two commands to separate microservices. Both take about the same time so the event response arrive in the AMQP queue around the same time. My handler method
@Bean
public SpringAMQPMessageSource orderEvents(Serializer serializer) {
return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
@RabbitListener(queues = "OrderQueue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//necessary because message is not delivered to saga otherwise
Thread.sleep(1000);
super.onMessage(message, channel);
}
};
}
The messages are delivered through AMQP - the method onMessage is invoked but the processing in DefaultAMQPMessageConverter prohibits the event delivery of both events to the saga (in some rare cases the latter is delivered).
The event processor registration:
//the manual registration of saga event subscription because of processing events from AMQP queue
@Autowired
@SuppressWarnings("unchecked")
public void createSaga(SagaStore sagaStore, SpringAMQPMessageSource springAMQPMessageSource, ResourceInjector resourceInjector, ParameterResolverFactory parameterResolverFactory, TransactionManager transactionManager)
{
String simpleName = OrderManagementSaga.class.getSimpleName();
AnnotatedSagaRepository sagaRepository = new AnnotatedSagaRepository<>(OrderManagementSaga.class, sagaStore, resourceInjector, parameterResolverFactory);
AnnotatedSagaManager<OrderManagementSaga> sagaManager = new AnnotatedSagaManager<>(OrderManagementSaga.class, sagaRepository, parameterResolverFactory);
this.subscribingEventProcessor = new SubscribingEventProcessor(simpleName + "Processor", sagaManager, springAMQPMessageSource);
this.subscribingEventProcessor.registerInterceptor(new TransactionManagingInterceptor<>(transactionManager));
this.subscribingEventProcessor.start();
}
Note that this happens only without timeout present in the onMessage.
Thanks!