SagaEventHandler will not subscribe to SpringAMQPMessageSource

Hi All,

In my application, I’m trying to make a Saga to handle a message from AMQP.
But unfortunately, I found that it works fine if I annotate a @ProcessGroup(“order”) on a normal bean containing methods with @EventHandler, but if I cut the same annotation to a Saga, it’s not working.
I’ve already define the mapping in the application.property.

axon.amqp.exchange=Axon.EventBus
axon.eventhandling.processors.order.source=queueMessageSource
@Configuration
public class AMQPConfiguration {

    private static final Logger LOGGER = getLogger(AMQPConfiguration.class);

    @Value("${axon.amqp.exchange}")
    private String exchangeName;

    @Bean
    public Queue queue(){
        return new Queue("orderqueue", true);
    }

    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.fanoutExchange(exchangeName).durable(true).build();
    }

    @Bean
    public Binding queueBinding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("").noargs();
    }

    @Bean
    public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
        return new SpringAMQPMessageSource(serializer){
            @RabbitListener(queues = "orderqueue")
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                LOGGER.debug("Message received: "+message.toString());
                super.onMessage(message, channel);
            }
        };
    }
}

The Saga class is not subscribing to SpringAMQPMessageSource. So the message will be ignored.

Is Axon not supporting saga to accept amqp message in this way, or I’m not doing the right way?
What’s the proper way if I want Saga to handle the message from mq?

Anyone got idea about this?

在 2017年4月13日星期四 UTC+8下午7:08:25,Edison Xu写道:

Hi Edison,

The @ProcessingGroup works on Event Handlers, but not on Sagas. Up until 3.0.3, the Configuration API has a limitation that you cannot configure a different source of events than the Event Bus. This is fixed in 3.0.4, where you can create a SagaConfiguration bean (named as + Configuration) that defines the configuration for that specific Saga.

I expect to be able to release 3.0.4 today.

Cheers,

Allard

Hi Allard,

Thanks for the replay.

Please correct me if I’m wrong. The concept Saga is used to handle the complex process, even in the distributed system. While in micro services architecture, the pieces of the process will be distributed into different micro services.
So Saga will have to be able to listen the events from different nodes. A mq can be used as the bridge between those nodes & saga.

Let’s say we have two services, order service and product service. When booking a order, we need to reserve the product.
And for the load balance & HA consideration, we make two instances of both the services.
We implement a Saga in the OrderService to handle the book and reservation process, using distribute command bus to send the reservation command to ProductService, and a Rabbit mq to pass the events.

When reservation of product is successful, a ReservationOkEvent is sent back to both instances of the OrderService.
Since only the node which sent out the command previously has the instance of Saga, to make sure it will receive the events, we configure the queue as Fanout mode.
So both nodes will get the events.

Here comes my questions:

  1. Can axon guarantee only one Saga would handle the message correctly? (Would the Saga on the other node create a new instance to handle the event?)
  2. Once we configured the distribution of Events, will the Saga read local events from the queue as well, or it will be notified by local event machnism?

Looking forward your answer.

Thanks,

Edison

在 2017年4月14日星期五 UTC+8下午7:50:53,Allard Buijze写道:

Allard,

Tried in v3.0.4 to create a xxxSagaConfiguration, but failed with exception:

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name ‘orderController’: Unsatisfied dependency expressed through field ‘commandGateway’; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘org.axonframework.spring.config.AxonConfiguration’: Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named ‘orderSagaConfiguration’ is expected to be of type ‘org.axonframework.config.ModuleConfiguration’ but was actually of type ‘com.edi.learn.cloud.command.config.OrderSagaConfiguration$$EnhancerBySpringCGLIB$$7eddc99e’

Is anything wrong with my step or is it not available in 3.0.4?

在 2017年4月14日星期五 UTC+8下午7:50:53,Allard Buijze写道:

Hi Edison,

I am new to Axon, i hope my answer helps (Axon experts, please correct me
if i'm wrong):

The exception you've got is related to the return type of
*orderSagaConfiguration* in your JavaConfig.

It is expected to be a *org.axonframework.config.**ModuleConfiguration, *the
closest implementation is the *org.axonframework.config.SagaConfiguration*
class.

Cheers,

Mehdi

Mehdi,

Yes, you’re right. And that’s exactly what happened.
But what I’m trying to do is to verify the new parts mentioned by Allard “This is fixed in 3.0.4, where you can create a SagaConfiguration bean (named as + Configuration) that defines the configuration for that specific Saga”.

Regards,

在 2017年4月18日星期二 UTC+8下午5:57:08,Mehdi Elaoufir写道:

Hi Edison,

The saga configuration bean (in your case orderSagaConfiguration) has to be of the correct type, namely SagaConfiguration. As Mehdi indicated, your application currently fails to start because the bean is not of the expected type.

I see you have an OrderSagaConfiguration class. Note that the default bean name of a configuration class is the simple class name with first lowercase character. In your case, that’s also orderSagaConfiguration.

That may conflict with a configuration such as:

@Bean
public SagaConfiguration orderSagaConfiguration() {
return SagaConfiguration.trackingSagaManager(OrderSaga.class);
}

Both beans are called “orderSagaConfiguration” and Spring will overwrite one with another. You can resolve this by using an explicit name on the @Configuration annotation on OrderSagaConfiguration.

Cheers,

Allard

Hi Allard,

Thanks for your reply. I followed your description to update my configure as below

@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
    Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
    SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
    return sagaConfiguration;
}

but somehow, it reports “javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process ‘persist’ call” Exception.
It’s strange because in the codes

private SagaConfiguration(Class<S> sagaType, Function<Configuration, SubscribableMessageSource<EventMessage<?>>> messageSourceBuilder) {
    String managerName = sagaType.getSimpleName() + "Manager";
    String processorName = sagaType.getSimpleName() + "Processor";
    String repositoryName = sagaType.getSimpleName() + "Repository";
    sagaStore = new Component<>(() -> config, "sagaStore", c -> c.getComponent(SagaStore.class, InMemorySagaStore::new));
    sagaRepository = new Component<>(() -> config, repositoryName,
                                     c -> new AnnotatedSagaRepository<>(sagaType, sagaStore.get(), c.resourceInjector(),
                                                                        c.parameterResolverFactory()));
    sagaManager = new Component<>(() -> config, managerName, c -> new AnnotatedSagaManager<>(sagaType, sagaRepository.get(),
                                                                                             c.parameterResolverFactory()));
    processor = new Component<>(() -> config, processorName,
                                c -> {
                                    SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(),
                                                                                                        messageSourceBuilder.apply(c));
                                    processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
    return processor;
    });
}

a InMemorySagaStore is created, but actually Axon uses JpaSagaStore instead, and the entity manager is lost. (I use Jpa to store the domain events)

I tried to define a axonTransactionManager in my configure class, but still not working.
So I added an interceptor to make it work.

@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
    Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
    SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
    Function<org.axonframework.config.Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder =
            c->transactionManagingInterceptor();
    sagaConfiguration.registerHandlerInterceptor(interceptorBuilder);
    return sagaConfiguration;
}

@Bean
public TransactionManagingInterceptor transactionManagingInterceptor(){
    return new TransactionManagingInterceptor(new SpringTransactionManager(transactionManager));
}

Although it’s working now, I’m still a little bit worried about why Jpatransaction manager is lost, and how to correct it.

Thank you,

Regards,

Edison

在 2017年4月19日星期三 UTC+8下午9:21:03,Allard Buijze写道:

Hi Edison,

the code in the SagaConfiguration class just specifies that an InMemorySagaStore should be used if no store was explicitly configured. The Sprint Autoconfiguration will configure a JpaSagaStore for you if it detects JPA on the classpath.

It seems that you are reading some messages from an external source. The subscribing event processor doesn’t start any transaction by default, because it is invoked in the thread that published the event to the processor. It is assumed that a transaction will be running for that component already. Obviously, you can still configure a Transaction Manager, just the way you did it.
Consider configuring a transaction manager in your external message source, instead.

A small recommendation from my side. The code is much more readable if you don’t assign functions to variables. For example:

return SagaConfiguration.subscribingSagaManager(OrderSaga.class, c-> queueMessageSource(serializer))
                        .sagaConfiguration.registerHandlerInterceptor(c->transactionManagingInterceptor());

This reads a lot easier, in my opinion.

Cheers,

Allard

Hi Allard,

Thanks for the quick reply and clarification.
As you mentioned, I added a @Transactional annotation on the onMessage method, it works as well.


@Bean
public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
    return new SpringAMQPMessageSource(serializer){
        @RabbitListener(queues = "orderqueue")
        @Override

        @Transactional

        public void onMessage(Message message, Channel channel) throws Exception {
            LOGGER.debug("Message received: "+message.toString());
            super.onMessage(message, channel);
        }
    };
}

And for the function to variables, I copied from the declaration of the method itself for convenience. I’ll definitely alter it to the propery lambda style.
Thank your for the recommendation. :slight_smile:

Sorry for bordering you, but I still have one more question to confirm.
Let’s say I have two instance of the same Saga codes, both receiving a event to trigger the subsequent scenario in the Saga(listening to a Fanout queue).
One will continue the scenario correctly because it can find the saga instance with the associate key, and for the other, will it create a new saga to continue the work?
Not sure about whether the result will be impact by the types of saga repository.

Thanks,

Edison

在 2017年4月20日星期四 UTC+8下午5:36:40,Allard Buijze写道:

Hi Edison,

good to hear you got it working.

Not sure what you mean with “will it create a new saga to continue the work”. If you have 2 saga instances, then they will both process the event, if they both have a matching association value.
If you have 1 existing Saga and the event handler method is (also) annotated with @StartSaga, it will not create a new instance, unless @StartSaga(forceNew =true) is specified.

Hope this helps.
Cheers,

Allard