[Axon 4] Multiple subscribing event processors for single saga


We’re currently migrating our microservices from Axon 3.1 to the newest version (4.4.6).

With version 3.1 we had setup our sagas to be able to read events from event bus and AMQP message source just by registering subscribing saga manager with AMQP message source

	public SagaConfiguration<SomeSaga> someSagaConfig(SpringAmqpMessageSource externalEventsMessageSource) {
		return SagaConfiguration.subscribingSagaManager(SomeSaga.class, c -> externalEventsMessageSource);

Reading from event bus was given by framework out of of the box.

However, using v4.4.6 we are not able to register two subscribing event processors for one saga. Either we can read events from event bus (with no additional configuration) or AMQP message source by registering event processor like this:

	public void someSagaConfiguration(EventProcessingConfigurer config, SpringAmqpMessageSource externalEventsMessageSource) {
		config.registerSubscribingEventProcessor("SomeSagaProcessor", c -> externalEventsMessageSource);

But I can see no way to register both of them after hours of debugging.

I found this post which describes the same issue, but Axon version discussed there was 3.3.

Can someone confirm that what we’re trying to achieve is not doable in the newest Axon version?

Thanks in advance,

BTW: we would like to switch to tracking processors, but currently we’re not able to do this easily due to various limitations.

Using the EventProcessingConfigurer#registerSubscribingEventProcessor method would always override the previous invocation of that method for the same processing group. Thus the EventProcessingConfigurer indeed does not allow you an easy means to register the Saga’s SubscribingEventProcessor to several sources.

Furthermore, I would assume this to be the same behaviour in Axon 3 actually. Any given SubscribingEventProcessor can only contain a single SubscribableMessageSource it is subscribed too. I would assume this means that in your current application, you have two SubscribingEventProcessor for the same Saga. This is however an assumption, which doesn’t change the fact of life you’re now in.

If you really, really need to stick to the SubscribingEventProcessor, you could customize your configuration such that you build two SubscribingEventProcessor instances for the same Saga type yourself. One of them would read from the EventBus and the other from your SpringAmqpMessageSource. Or, you could set up a custom SubscribingEventProcessor which can have several SubscribableMessageSource instances.

The lock placed on a Saga should safe you from concurrency issues. This will however not safe you when you will run several instances of your application. To allow further parallelization of your application, you simply have to use the TrackingEventProcessor. I noticed your “btw”, but I do think it would be an easier approach to the problem at hand.

Granted, this would still not allow your Saga to directly read from the SpringAmqpMessageSource, since that’s a SubscribableMessageSource and a TrackingEventProcessor requires a StreamableMessageSource. Whether using the SpringAmqpMessageSource as a direct ingestion point for the Saga is debatable anyhow, as that source tends to be used to read events from different bounded contexts. Directly pushing events from another context in your own is not recommended. Using a form of context mapping (like an anti-corruption layer) to translate the incoming messages into a local type would be the way to go for this.

However, I digress. I hope your options are clearer @Tomek_Spulak. If you really need to stick with the old default of the SubscribingEventProcessor, you would either need to create a custom implementation or create two instances of the processor. Moving to the TrackingEventProcessor would be more future proof, but requires a rethink on what the SpringAmqpMessageSource is given your application.

Thank you very much for your response @Steven_van_Beelen. Now I have a clear situation… and some coding to be done :slight_smile:

Awesome, glad to have been of help @Tomek_Spulak.
Ow, and forgot to mention, but it’s great that you’re making the move from Axon 3 to Axon 4 :slight_smile:

To be honest we did not plan this upgrade in near future, but we have to do it earlier due to missing events on projection with subscribing event processor on shutdown. This issue hit us several times on production :frowning: and we needed to reprioritize.
Nevertheless, I think this only right way to keep libs/frameworks used up-to-date.

Aah, so you aim to first benefit from the graceful shutdown and start-up process which was introduced in Axon. Makes sense…it does safe you from that window of opportunity that the SubscribingEventProcessor might miss some events during the shutdown.

I am guessing I don’t have to tell you, but I will regardless since other people might also read this thread. Using the TrackingEventProcessor can be regarded as an additional safeguard in that process. Simply because it keeps track of its progress in a token. Added, it gives you the capability to move back in time if something in the event handling process went wrong.

Granted, I’d first migrate to Axon 4. Once that’s finished, trying out to move to the TEP might be a feasible next step for added safety.

Hi @Steven_van_Beelen, I have some questions regarding the same issue.

If I understand it correctly, we cannot assign two sources for saga event processors but we can assign two processors for single saga. Can you explain it a little bit more? Would be great if you can give me an example how to do this.

Looking at the implementation of SpringAMQPMessageSource.onMessage it takes message and passes it to all event processors. Can you explain why it cannot just publish message on EventBus?

If I understand it correctly, we cannot assign two sources for saga event processors but we can assign two processors for single saga. Can you explain it a little bit more? Would be great if you can give me an example how to do this.

You would, in custom code, need to retrieve the SagaManager (the concrete implementation would be the AnnotatedSagaManager). Note that the SagaManager is nothing more than a specific type of Event Handler to Axon. Furthermore, you would construct two SubscribingEventProcessor (SEP) instance, which each receive the same SagaManager.

Looking at the implementation of SpringAMQPMessageSource.onMessage it takes message and passes it to all event processors. Can you explain why it cannot just publish message on EventBus?

The EventBus is, in essence, a type of SubscribableMessageSource. The SpringAMQPMessageSource is just as well a type of SubscribableMessageSource. Passing from one source to another potentially causes undesired mixing of events coming from distinct contexts. From a consumer perspective (your Saga) this might be fine, but from the bus’ perspective, you would want these to be strictly segregated.

Having said all this, I would like to rehash that I would recommend using a TrackingEventProcessor for any Saga. Simply because it ensures that if your system halts mid-event handling, that it will simply start at that same event on startup. Because it uses this TrackingToken to define where it was in the Event Stream. If you are using a subscribable message source, you lose this flexibility entirely. Furthermore, that means your complex business transaction (as defined in the Saga) could end up in an erroneous state you cannot pull it out of. On top of that, you have ensured it is delt within its own transaction, since the TrackingEventProcessor works in separate threads.

Nonetheless, I hope my explanation will help you further @klekowskim. Let us know whether you work out a solution using 2 SEPs. :slight_smile:

Can you give an abstract example of how do this?

I know how to create a new subscribing event processor:

	public void configure(EventProcessingConfigurer config, SpringAmqpMessageSource externalEventsMessageSource) {
		config.registerSubscribingEventProcessor("SomeName", c -> externalEventsMessageSource);

But how to tell saga to use this event processor (together with the standard one (event bus))? Do I need to configure it manually somehow and not use the @Saga annotation?

To be completley honest with you, I am not in the practice of typing solutions word by word for anybody. Added, I truly hope that you take into account my warning I’ve shared where I think this is a bad idea. Please use the TrackingEventProcessor, it will safe a lot of headache in the future. At any rate, I am not going to give a coded solution here.

So, let me give a textual approach on the top of my mind.
You will need to construct two SubscribingEventProcessor instance, manually. Each with its own source. The JavaDoc on the Builder cleary states which fields are hard requirements and which aren’t, so definitely take a look there too. Thus, completely circumvent the EventProcessingConfigurer here, as this component assumes a single processor per component.

Furthermore, you would indeed not rely on the Spring stereotype @Saga, and thus go for the manual approach through the SagaConfigurer. Again, the JavaDoc clearly states what is mandatory here and how to coop with such a component. If you are done with the SagaConfigurer, you will need to invoke the initialize(Configuration) method yourself on this guy.

With the SagaConfiguration (the result of initializing the SagaConfigurer) in hand, you can provide the AbstractSagaManager as the EventHandlerInvoker to the SubscribingEventProcessor builder of both instances. With that in place, you should be able to start both.

Again, I highly recommend going for a TrackingEventProcessor instead. I am pretty confident I am missing out on some pointers in my above description, and to be honest with you, I intend to keep it that way. Using the TrackingEventProcessor is just much more of a safe solution when it comes to combining message streams.

By the way, I just noticed I forgot to mention one thing.

The cleanest solution to combine several sources for a SubscribingEventProcessor, would be to define a different type of SubscribableMessageSource. For the TrackingEventProcessor (and the upcoming PooledStreamingEventProcessor) we already have such a thing, called the MultiStreamableMessageSource.

Having a MultiSubscribableMessageSource would, in my opinion, be the nicer option out there. Granted, this takes development time, either from you or from us. If you really require the combine several subscribing sources for a SubscribingEventProcessor used by a Saga (which, I’d like to reiterate, I would not recommend), it is the message source which should give you both.

Thank you very much for the answers! :slight_smile:

I think moving to Tracking processors from Subscribing is a little bit hard for us, because of some custom solutions and maybe because of wrong approaches somewhere (e.g. we depending on transactions when we process many events in many aggregates). But maybe we will try to switch and see what happens.

Regarding the issue, we get rid of SpringAmqpMessageSource completely. Instead we have a generic AMQP event handler and we copy messages to event bus. This is just a first step, in next we want to handle all of them separately and handle in a specific way (an anti-corruption layer).

1 Like