Configuring multiple subscribing managers in Saga

Hi,

Is it possible to configure multiple subscribing managers for same saga? For example I’ve MoneyTransferSaga in order to start the saga I listen from regular event bus but in order to end saga I’ve to listen from RabbitMQ. For now I am only able to listen from any one of these but not from both as I am only able to configure the subscribing manager to listen to either eventbus or rabbitmq.

Thanks,
Malay

Hi Malay,

If you create an EventListener with source RabbiMQ, you can publish all incoming events on the eventBus.
Then you only need 1 source for the Sagas. Thats the way we implemented this.

Kind regards,
Koen

Hi Koen,

Could you guide me to some doc for creating eventlistener with RabbitMQ as source. Also, the event that one of my microservices publish will go to separate queue while it is listening to another queue so the problem now transfers to how will I publish to 2 different queues?

Thanks,
Malay

Hi Malay,

You can find some documentation for reading from RabbitMQ here: https://docs.axonframework.org/part-iii-infrastructure-components/event-processing -> paragraph Reading Events from an AMQP Queue

  1. Provide a bean to listen on a rabbitmq queue
@Bean
public SpringAMQPMessageSource warehouseOutSapIntegratorSource(AMQPMessageConverter messageConverter, EventUpcasterChain upcasterChain)
{
    return new BackwardCompatibleMessageSource(messageConverter, upcasterChain, new XStreamSerializer())
    {
        @Transactional
        @RabbitListener(id = WHS_OUT_ENDPOINT_ID, queues = QUEUE_WAREHOUSE_OUT_SAP_INTEGRATOR, errorHandler = "stopOnExceptionWhsOutErrorHandler")
        @Override
        public void onMessage(Message message, Channel channel)
        {
            super.onMessage(message, channel);
        }
    };
}

  1. Create an event listener like this:
@ProcessingGroup("com.brutex.warehouse.integrator.sap.processor.IncomingWhsRabbitEventProcessor")
@Component
public class IncomingWhsRabbitEventProcessor
{
    private static final Logger LOGGER = Logger.getLogger(IncomingWhsRabbitEventProcessor.class);

    @Autowired
    private EventBus eventBus;

    @EventHandler
    public void onMessage(WhsEvent event)
    {
        LOGGER.info("Received event {}", event);
        //TODO: also read metadata etc .. into new event
        eventBus.publish(GenericEventMessage.asEventMessage(event));
    }
}


3) in application properties, set source of this handler to Rabbit:

Hi Koen,

Actually my last question was a misunderstanding from my side so now second question does not exist. Thanks for the link and code it helped.

Cheers,
Malay

Beaware that there is no upcasting for events coming from rabbit.
Axon assumes that these events can be deserialized with the class definition that he has on his classpath.

Hi Malay and Koen,

What Koen suggest would very likely solve your problem for now Malay.

We do hear the request to merge different Event Streams in to one (which essentially is what you’re asking, merging your EventStore stream and the AMQP Event stream) more often.

Thus we do have it slated to be implemented somewhere in the future.

I cannot give you a time frame when this will be introduced though.

So for now I’d follow Koen’s suggestion on this.

That’s my 2 cents.

Cheers,
Steven

Thank you very much! This solution works for me :).

Vào 16:59:26 UTC+7 Thứ Hai, ngày 13 tháng 8 năm 2018, Koen Verwimp đã viết: