How can we use a different event processor for each instance of a Saga ?

Hello,

We are trying to use a Saga as a way to synchronize two contexts. An event in context A means we have to replay all events and send commands and/or build projections in context B.
For this to work we would need a new Saga instance for each synchronization task (a @StartSaga @SagaEventHandler(…) annotated method should do that), but we also need each instance to get its own token entry.

I read this part of the documentation which makes me think there should be a way to do it, but I just can’t find it : https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/event-processing/event-processors#sagas

Thank you for your help

Hi Raphael,

by default, Axon will give you a different processor for each Saga Type (i.e. class name). A processor per instance is not possible in Axon, as that would require the creation and deletion of processors at runtime, which Axon isn’t designed for.
Instead, you can configure the number of threads per processor, which increases the parallelism for your Sagas. You can also do the opposite: combine multiple Saga types into the same Processor, to have a single thread (or certain number of threads) process Sagas of multiple types.

Hope this clarifies things for you.
Cheers,

Hi Allard,

Thank you for the clarification. Does that mean that all Saga instances always share the same token position ?

How would one handle “delayed” synchronization of events between contexts then ? In our case we need to replay events applied from several aggregates, after a specific event took place.
The only way we managed to do it is to use the eventStore.openStream(null) method, test payload type and then filter to retrieve only events we’re interested in :


// Inside a @StartSata @SagaEventHandler

BlockingStream<TrackedEventMessage<?>> blockingStream = eventStore.openStream(null);

    while (blockingStream.hasNextAvailable(60, TimeUnit.SECONDS)) {

        TrackedEventMessage<?> eventMessage = blockingStream.nextAvailable();

        if (eventMessage.getPayload() instanceof SomeEvent) {

The easier alternative to your process would be:

@EventHandler

public void handle(SomeEvent event) {
if (!event.getIdentifier().equals(identifierWereInterestedIn)) {
return;
}
// handling event

}

@EventHandler

public void handle(AnotherEvent event) {
if (!event.getAssociatedIdentifier().equals(identifierWereInterestedIn)) {
return;
}
// handling event

}

Axon will take care of the threading, keeping progress (unless you really want to start reading from the beginning each time) and the blacklisting. If you have multiple threads, each thread will know which part of the processing to act on, and keeps track of progress for its part. This works for Sagas as well as “regular” event handlers. For the latter, you can define which “sequencing policy” you want. That allows you to specify which events may be processed in parallel, and which events need to be executed strictly sequentially.

Hope this helps.
Cheers,

We do want to start reading from the beginning each time, and the identifier we’re interested in is different each time. I understand this is probably an uncommon use case.

Thank you

Hi Raphael,

I’m not sure this approach would be sustainable. You’d have to read more and more events as your application’s dataset grows.
A Saga might help, but ultimately it really depends on the use case. You could have a Saga build up some relevant state internally, and only send that out when a certain “trigger” event occurs. In some cases, view models can also provide the required information. Then you would send a query to gather the data when the “trigger” event arrives.

Pick your battle :wink:
Hope this helps.

Kind regards,