Ending Saga on Multiple Events

Hello Team,

The following is my domain model

Run HealthCheck Reports

  • Run Report 1
  • Run Report 2
  • Run Report 3 …

I have a usecase where my saga executes a command to run all these reports and I have to maintain the state of each report. There is a report runner worker which will run the reports and generate a report completed notification which my saga will have to consume.

Once all reports are done (Failed / Completed) I have to generate an event for the ending of process.

Can you tell me how I should handle this thing in Saga.

Hi Ajinkya Virkar,

I’m newbie myself in CQRS/Axon but what I can think of I would do something like this in my SAGA:

boolean firstFinalEventArrived = false;
boolean secondFinalEventArrived = false
boolean lastFinalEventArrived = false

@SagaEventHandler(associationProperty = “id”)
public void on(FirstFinalEvent event) {
firstFinalEventArrived = true;
if (firstFinalEventArrived & secondFinalEventArrived & lastFinalEventArrived ) {
.end(); // end Saga
}
}

@SagaEventHandler(associationProperty = “id”)
public void on(SecondFinalEvent event) {
firstSecondEventArrived = true;
if (firstFinalEventArrived & secondFinalEventArrived & lastFinalEventArrived ) {
.end(); // end Saga
}
}

@SagaEventHandler(associationProperty = “id”)
public void on(LastFinalEvent event) {
lastFinalEventArrived = true;
if (firstFinalEventArrived & secondFinalEventArrived & lastFinalEventArrived ) {
.end(); // end Saga
}
}

Guys,
Please correct me if I’m wrong or there is better way…

Probably it would be nice to have some Condition annotation for several events but each saga logic will be very unique, so, probably it’s not worth do implement such thing.

Thanks,
Evgeny Kochnev

.ends() means SagaLifecycle.end();

Sure, I can go with this approach. The point where I am stuck currently is, my listener is able to listen to the events published by the external microservice. But is there a way the saga can directly read those events from the the event bus (In my case rabbit mq).

Hi Ajinkya,

in Axon, the Event Processor is responsible for reading events from a specific source, and invoking the correct Event Handlers. The latter are the @EventHandler methods that contain your business logic.

By default, Event Processors read from the Event Bus, as in 99% of the cases, that’s where events are published. It is however, very well possible that you want to read from another source. In that case, just instruct the processor to use another source.

Actually, yesterday, I coordinated a hands-on lab that had an exercise exactly for this purpose. You can check it out here: https://github.com/AxonIQ/jfall-axon-labs. The “chat-scaling-out” module is the lab you’d want to be looking at, and on the solution branch, you can see how it’s done. The “RoomParticipantsProjection” handler is configured to read events from RabbitMQ.

Hope this helps.

Allard

Hello Allard and Evgeniy,

I think I got the concept as to how I can manage my business case. The thing that I was missing was publishing the event on the bus. Once I started doing that the Saga was able to suggessfully handle these events from external sources.

Hi Ajinkya,

I am also facing same issue, the event that I publish on RabbitMQ is visible in my consumer service and is consumed by @EventHandler but @SagaEventHandler is not able to listen to it, could you tell me how exactly did you solve this issue?

Thanks,
Malay M

Hi,

so you have two SpringAMQPMessageSources (either in the same application, or in different applications) listening to the same queue?

Cheers,

Allard

In different applications and listening to different queues.

Cheers,
Malay

Did you define a SagaConfiguration instance (either using correct naming convention, or with the name defined in the @Saga(configurationBean=…) annotation)? On that SagaConfiguration bean, you need to specify the source.
Note that the axon.processors… style configuration does not work for Sagas, yet.

Hope this helps.
Cheers,

Allard

Hi Allard,

Thanks for taking out time to help us out,

This is the SagaConfiguration Bean we have inside our Configuration file, my Saga is named UserProfileSaga hence the name userProfileSagaConfiguration for the method as per the conventions.

`
@Bean
public SagaConfiguration userProfileSagaConfiguration(Serializer serializer) {
return SagaConfiguration.subscribingSagaManager(UserProfileSaga.class);
// return SagaConfiguration.subscribingSagaManager(UserProfileSaga.class, c-> messageSource(serializer));
}

@Bean
public SpringAMQPMessageSource messageSource(Serializer serializer) {
System.out.println("\n\n— On Message Call —\n\n");
return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

@Transactional
@RabbitListener(queues = “testdemo”)
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("\n\n"+message.getMessageProperties()+"\n\n");
System.out.println("\n\nchannel == " + channel+"\n\n");
System.out.println(new String(message.getBody()));
System.out.println(channel);
super.onMessage(message, channel);
}
};
}
`

Thanks,
Malay

There is a problem with your SpringAMQPMessage source definition.
Have a look at https://docs.axonframework.org/part-iii-infrastructure-components/event-processing#reading-events-from-an-amqp-queue for an example.

Cheers,

Allard

I tried configuring as mentioned in the link you shared, it still does not handle event from @SagaEventHandler but does so from @EventyHandler, below is my configuration class

`
@Configuration
public class AxonConfiguration {

private final static Logger logger = LoggerFactory.getLogger(AxonConfiguration.class);

@Value("${axon.amqp.exchange}")
private String exchange;

@Bean
public Exchange exchange() {
logger.info(exchange + " AMQP Exchange Registering ");
return ExchangeBuilder.fanoutExchange(exchange).build();
}

@Bean
public Queue queue() {
return QueueBuilder.durable(exchange).build();
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}

@Autowired
public void configure(AmqpAdmin amqpAdmin) {
amqpAdmin.declareExchange(exchange());
amqpAdmin.declareQueue(queue());
amqpAdmin.declareBinding(binding());
}

@Bean
public SagaConfiguration userProfileSagaConfiguration(Serializer serializer) {
return SagaConfiguration.subscribingSagaManager(UserProfileSaga.class);
// return
// SagaConfiguration.subscribingSagaManager(UserProfileSaga.class, c ->
// testdemo(serializer));
}

@Bean
public SpringAMQPMessageSource testdemo(Serializer serializer) {
return new SpringAMQPMessageSource(serializer) {
@Transactional
@RabbitListener(queues = “testdemo”)
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("\n\n" + message.getMessageProperties() + “\n\n”);
System.out.println("\n\nchannel == " + channel + “\n\n”);
System.out.println(new String(message.getBody()));
System.out.println(channel);
super.onMessage(message, channel);
}
};
}

// public void configure(EventHandlingConfiguration ehConfig, SpringAMQPMessageSource testdemo) {
// ehConfig.registerSubscribingEventProcessor(“myProcessor”, c -> testdemo);
// }
}
`

Thanks,
Malay

Did you uncomment the
// SagaConfiguration.subscribingSagaManager(UserProfileSaga.class, c ->
// testdemo(serializer)

lines before trying? You do have to register the SpringAMQPMessageSource as the source from which your Saga must read events.

Cheers,

Allard

Yes I did uncomment those lines, it did not work so I reverted back to those previous line without c->testdemo(serializer).
But I did not uncomment

// public void configure(EventHandlingConfiguration ehConfig, SpringAMQPMessageSource testdemo) {
// ehConfig.registerSubscribingEventProcessor(“myProcessor”, c -> testdemo);
// }

Cheers,
Malay

Just to be sure, instead of calling the bean’s creator method, just have it injected as a parameter. Normally, Spring will intercept these calls (even when coming from within the same class), but it may very well be that there are limitations, still.

Just do:

@Bean
public SagaConfiguration userProfileSagaConfiguration(Serializer serializer, SpringAMQPMessageSource source) {
return SagaConfiguration.subscribingSagaManager(UserProfileSaga.class, c -> source);
}

Cheers,

Allard

Tried doing that but it did not make much of a difference there.

Thanks,
Malay

It looks like you have two @RabbitListener annotations listening to the same queue. I only saw one of the annotations, but since you’re defining only a single queue, I make this assumption.
It may very well be that Spring creates a MessageListenerContainer per @RabbitListener annotation, and that the two components are now competing for messages. I recommend removing one of them. If you want to log messages for debugging purposes, do so from within the method you override in the SpringAMQPMessageSource.

Cheers,

Allard

I looked into the code for another @RabbitListener but there is only one @RabbitListener.

Thanks
Malay

Hi Allard,

Does @StartSaga create a new transaction? And if it does then is it possible that the transaction created by @Transactional on @RabbitListener ends up creating a separate transaction because of which the @SagaListener is not listening to this event while @EventHandler can?

Regards,
Malay