Saga with AMQP?

Hello,

I would like to use the SAGA as an orchestrator between multiple different bounded context, all are running in a separate JVMs.

When looking in the source code, I understand that a SubscribingEventProcessor is created with name like “SagaName”+Manager.
It looks like this event listener is always bind to the default EventBus?

Is it possible to have this linked to an AMQP source?
The saga needs to listen to events coming from different JVMs.

If not, do you see any other option to have a SAGA between different JVMs.

Kind regards,
Koen

Hi Koen,

unfortunately, the automatic configuration based on the @Saga annotation doesn’t allow you to configure this. You will have to declare a Saga Manager yourself. Define a Tracking or SubscribingProcessor, and pass an AnnotatedSagaManager as constructor parameter.
The processor also takes the event source as a constructor arg. So you can configure it in there.

In upcoming versions, we will improve on the (automatic) configuration in combination with Spring Boot.

Cheers,

Allard

Hi Allard,

Thank you for your reply!
Is this what you mean?

@Autowired
public void createSaga(SagaStore sagaStore, SpringAMQPMessageSource springAMQPMessageSource, ResourceInjector resourceInjector, ParameterResolverFactory parameterResolverFactory)
{
    String simpleName = IncomingQCSaga.class.getSimpleName();

    AnnotatedSagaRepository sagaRepository = new AnnotatedSagaRepository<>(IncomingQCSaga.class, sagaStore, resourceInjector, parameterResolverFactory);
    AnnotatedSagaManager<IncomingQCSaga> sagaManager = new AnnotatedSagaManager<>(IncomingQCSaga.class, sagaRepository, parameterResolverFactory);

    this.subscribingEventProcessor = new SubscribingEventProcessor(simpleName + "Processor", sagaManager, springAMQPMessageSource);
    this.subscribingEventProcessor.start();
}

@PreDestroy
public void destroy()
{
    this.subscribingEventProcessor.shutDown();
}

Yep, that’s pretty much it.

Hello,

after adding the code snippet above, I got some TransactionRequiredExceptions (No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call).

Cloud you help me what could be the problem?

Thanks,
Szabolcs

Managed to solve this problem, by starting a transaction manually. (Not sure if it is the right solution)

Now my Saga receives every event from the same module twice. (One from the AMQP message source.) It works fine with the events from other modules.

You need to register a TransactionManagingInterceptor (with a SpringTransactionManager) on your processor. As the events are processed in another thread, the handling thread does not have access to the transaction. You will have to start a new one.

Do you still have the @Saga annotation on your class? If so, you probably have the default Axon components as well as your custom component sending events to the Saga instances.

Cheers,

Allard

Cool, thanks for the quick help.
After removing the @Saga annotation and registering the TransactionManagingInterceptor everything works fine.

Regards,
Szabolcs

Hi,
I am facing the same issue with rabbitmq and saga. Can you please share with me the code base where you create the Saga Manager and make it work.
Thanks,
Saibal

Hi Allard,

I don’t really understand how to inject ResourceInjector and ParameterResolverFactory automatically in the example above.

I found the source of the part of SagaConfiguration that initiates SubscribingEventProcessor and I understand the ResourceInjector and ParameterResolverFactory comes from AxonConfiguration directly.

@SuppressWarnings("unchecked")
private SagaConfiguration(Class<S> sagaType) {
    String managerName = sagaType.getSimpleName() + "Manager";
    String processorName = sagaType.getSimpleName() + "Processor";
    String repositoryName = sagaType.getSimpleName() + "Repository";
    sagaStore = new Component<>(() -> config, "sagaStore", c -> c.getComponent(SagaStore.class, InMemorySagaStore::new));
    sagaRepository = new Component<>(() -> config, repositoryName,
                                     c -> new AnnotatedSagaRepository<>(sagaType, sagaStore.get(), c.resourceInjector(),
                                                                        c.parameterResolverFactory()));
    sagaManager = new Component<>(() -> config, managerName, c -> new AnnotatedSagaManager<>(sagaType, sagaRepository.get(),
                                                                                             c.parameterResolverFactory()));
    processor = new Component<>(() -> config, processorName,
                                c -> new SubscribingEventProcessor(managerName, sagaManager.get(), c.eventBus()));
}

Is that correct if I inject these two objects by autowiring AxonConfiguration directly like below ?

Hi Lei,

yes, that’s correct.
Thinking about this, these components should probably be marked as “primary”, so that you would be able to autowire the ParameterResolverFactory. It might even work now, but the approach with injecting AxonConfiguration (which you can also pass as parameter, instead of using an @Autowired field) will provide the correct instances.

Cheers,

Allard