How to run a particular Saga asynchronously?

Hi everyone,

I have a Saga which runs a background task, which takes time to finish (long-lived task). I picked a Saga over a service, as I will have one instance per aggregate instance, thus I cannot use a singleton service as I need an instance per aggregate.

Furthermore, this saga is expected to receive events while working and has the following structure

Saga

  • state

@StartSaga
@SagaEventHandler(associationProperty = “xxx”)

Hi Albert,

I’d typically use a Saga if you’re trying to do a complex business transaction spanning multiple aggregates and/or bounded contexts.

What you seem to described is one Saga directly tied to one aggregate, so without any additional associations.

I’d deem this is a signal you might want to follow a different solution than a Saga.

Maybe if you’d explain your scenario in more debt that I can deduce whether it’s okay or not, but I’m not yet convinced.

What does the ‘method which takes long’ do?

Or you ever publishing commands from this Saga back into the Aggregate?

Is there an option that a dedicated regular Event Handling Component can pick up the task for you?

For you second question, if you want your Saga to be run asynchronously, you want your SagaManager to be asynchronously.

Since a SagaManager is just a fancy complicated wrapper around a regular Event Handling Component, you can thus specify sync vs async by selecting the EventProcessor to use.

You can thus use a TrackingEventProcessor or a SubscribingEventProcessor.

The first is asynchronous by default, whilst the latter can be made asynchronous if you select the right EventProcessingStrategy.

Hope this helps Albert.

Cheers,

Steven

Hi Steven,

Thank you for your reply.

The application can interact with one or more email accounts. Each email account is represented by an aggregate. The aggregate is used to control the flow/state of this email account. For example, to download emails a command StartRetreiveEmailsCommand is invoked. The aggregate responsible from the account id in question verifies that its state and returns true only if this is idle. Thus the aggregate prevents concurrent downloads for the same email account. The aggregate also triggers an event that indicates that the retrieving of emails started and updates its state. Further requests of the StartRetreiveEmailsCommand command to the same account id are ignored until the state changes back to idle to prevent concurrent downloads on the same email account. Different email accounts are independent and while one email account is retrieving its emails, so can the other.

A service that is actually handling the email download captures the EmailsRetreiveStartedEvent and start the retrieving process for this email account. This part is done on a different thread as it is a long task.

Once the service retrieves the list of emails, it verifies with the respective aggregate whether this emails can be downloaded or not using another command. The aggregate returns a boolean value indicating whether this email can be download or not as shown next.

boolean download = commandGateway.sendAndWait(DownloadEmailMessageCommand.of(/parameters/));

The aggregate also publishes another event that indicates that an email started to be downloaded. The service sends another command to the aggregate indicating that the email was actually downloaded or a different command indicating a failure. Once ready from all emails, the service publishes the final command saying that it is ready. The aggregate then updates its state.

As you well pointed out, in this example, we are dealing with just one aggregate and thus a saga has little use. I wanted to use Sagas as the AXON framework channels events to the right saga instance. AXON groups all events that belong to the same account id to one saga instance which is very convenient. Two or more email services can run concurrently without affecting each other and having their respective aggregate managing their flow.

In my solution, the service intercepts all events of interest and creates a background worker linked to the account id. Then the service routes all events to the respective worker using the same account id. My service has lots of code that may already exist in the AXON framework and if so, I would like to use that instead of mine.

I have used the TrackingEventProcessor in the past. I did not want to include this as I do not care about past events. Furthermore, I do not know how to group events by aggregate identifier (account id in my case) using the TrackingEventProcessor (similar to what happened with Saga).

Any feedback is welcome.

Hi Albert,

A couple points spring to mind, prior to me answering your initial question about whether to use a Saga or not:

  • Your DownloadEmailMessageCommand doesn’t sound like a command to me, but a query. Your requesting the Aggregate for a certain bit of state, in this case whether you may download that email or not. Modeling it as a command is thus not the way to go. You could instead contain all the do-not-download-emails in your ‘EmailsRetreiveStarteEvent’ for example.
  • You’re talking about ‘the service’ in your set up, but it’s not clear what ‘the service’ is. Is this your current solution to this problem? Is that just a regular Event Handlere? Or is ‘the service’ the Saga approach you’re going for?
  • The use of a TrackingEventProcessor does not necessarily mean you should be interested in events in the past, it’s just changing the way to get events from a push to a pull mechanism. Especially in combination with a Saga it is definitely not recommend to ever replay the events against it. Saga’s trigger new action, thus replaying a Saga would mean you’d create side effects like sending new emails, or in your example download emails a second time. Any how, I’d not be afraid to use a TrackingEventProcessor in this scenario if I were you.
  • Grouping events per aggregate within a TrackingEventProcessor used to not be an option in 3.0, but it is in 3.1 and up. You can now select how many threads and segments your TrackingEventProcessor should run with and in addition to that you can provide your own SequencingPolicy. The SequencingPolicy is in charge of deciding which thread gets which events. This can for example be set to SequentialPerAggregatePolicy, which will thus ensure that all the events for an aggregate will be routed to the same thread.

Apart from this, a Saga could just as well be of use here.

From what you’ve described, I get the feeling that your Saga will perform the download of the emails and after that verify whether it could download those.
As such, it’s a component coordinating actions between your ‘Email Account Applicant’ (the one containing your aggregate) and the ‘Email Download Service’.
You are thus to some extent coordinating actions between different contexts. Additionally, depending on the number of emails, it might just as well be a long running transaction. And lastly, it could be the container of state whether certain emails cant or cannot be downloaded (thus removing your query-command, the DownloadEmailMessageCommand).

Thus this give you enough insights Albert?

If not, feel free to ask further questions!

Cheers,

Steven

Thank you very much, Steven, for coming back to me on this. I really appreciate your input.

Please find following my comments/feedback. Kindly do let me know should something is not clear or you need more information.

Your DownloadEmailMessageCommand doesn’t sound like a command to me, but a query. You’re requesting the Aggregate for a certain bit of state, in this case, whether you may download that email or not. Modelling it as a command is thus not the way to go. You could instead contain all the do-not-download-emails in your ‘EmailsRetreiveStarteEvent’ for example.

My motivation for putting the DownloadEmailMessageCommand as a command is the state. If the download commences, then the aggregate state changes from idle to downloading, for example. I found that aggregates simplify state management of objects and I believe that’s their main scope of existence. When the user clicks a button that starts the emails download or a scheduler triggers this process, the request goes to the aggregate. The aggregate then decides whether to start the download or not. All business logic related download starting or not is captured by an aggregate.

You’re talking about ‘the service’ in your set up, but it’s not clear what ‘the service’ is. Is this your current solution to this problem? Is that just a regular Event Handler? Or is ‘the service’ the Saga approach you’re going for?

Sorry for not explaining this better. I added some code which is responsible for downloading the emails and referred to it as ‘the service’. This code is not part of the aggregate nor part of a saga. It interacts with the AXON framework by capturing events and emitting commands. Please find attached a diagram capturing the interaction between the aggregate and the code responsible for the actual downloads.

The code that is actually downloading the emails is dumb and contains no business rules/logic. It simply delegates all logic to the aggregate which in turn is managing its state.

Kindly do let me know if this is not clear and will try to explain things further.

The use of a TrackingEventProcessor does not necessarily mean you should be interested in events in the past, it’s just changing the way to get events from a push to a pull mechanism. Especially in combination with a Saga it is definitely not recommended to ever replay the events against it. Saga’s trigger new action, thus replaying a Saga would mean you’d create side effects like sending new emails, or in your example download emails a second time. Anyhow, I’d not be afraid to use a TrackingEventProcessor in this scenario if I were you.

The TrackingEventProcessor (like the SubscribingEventProcessor) works well when the events are isolated (not sure if I am using the correct term), otherwise, you need to rely on something else to link different events together. For example, if I have the events UsernameUpdatedEvent and PasswordChangedEvent, using a TrackingEventProcessor (a SubscribingEventProcessor) you need to link these two events, usually using the user ID so that you update the proper table record with the new changes. This is fairly straightforward with databases, but a bit more complex with in-memory objects. In my example, I have an in-memory object that is downloading the emails to which I want to send specific commands. With a Saga, this happens automatically with the AXON framework as the events are routed to the proper Saga instance. Given that I am not using a Saga, I had to write some code that routes the events to the proper in-memory object instance.

Grouping events per aggregate within a TrackingEventProcessor used to not be an option in 3.0, but it is in 3.1 and up. You can now select how many threads and segments your TrackingEventProcessor should run with and in addition to that, you can provide your own SequencingPolicy. The SequencingPolicy is in charge of deciding which thread gets which events. This can, for example, be set to SequentialPerAggregatePolicy, which will thus ensure that all the events for an aggregate will be routed to the same thread.

I am not sure I am following you here. Can you please point me to an example that I can use? Maybe this has the solution I need. I love coding, but I do not want to build something that already exists. I rather use the AXON framework in its entirety and add where needed, then reinvent the wheel. Please note that I am not bound to any version of AXON framework as yet.

Apart from this, a Saga could just as well be of use here.
From what you’ve described, I get the feeling that your Saga will perform the download of the emails and after that verify whether it could download those.
As such, it’s a component coordinating actions between your ‘Email Account Applicant’ (the one containing your aggregate) and the ‘Email Download Service’.
You are thus to some extent coordinating actions between different contexts. Additionally, depending on the number of emails, it might just as well be a long-running transaction. And lastly, it could be the container of state whether certain emails cant or cannot be downloaded (thus removing your query-command, the DownloadEmailMessageCommand).

That was my first thought as it seemed to me that Saga fits the bill. But I need to be able to run this particular Saga on a background thread without changing the other existing Sagas. In fact, the title of this thread reads “How to run a particular Saga asynchronously?

Thank you once more for your time and help. It is greatly appreciated.

Email Downloader Flow.png

Hi Albert,

Let me go through your replies point wise.

  1. I understand that you’re using your Aggregate to verify if downloading that email was allowed. It was the snippet you’ve shared around the download command which makes it seem you’re using it to query. That fact you’re using a return value on command handling to me is an additional pointer you’re doing so. It’s completely fine to have a command issued to an aggregate for something like ‘download email’, but I’d use the resulting event as the deciding factor whether downloading that exact email was allowed or not.

  2. Your diagram clarifies a lot of the process you’re trying to follow. Definitely helps my understanding of the use case.

  3. I’d say it wholly depends on what the query model is you’re updating. Maybe it is based on all the events, thus the link is ‘all’. But maybe it just is the User-id of your example, and then yes you’d have to pull out the right query model to update based on those. The SagaManager just does that for you by pulling out the right Saga instance from the repository.

  4. A EventProcessor (be it Tracking or Subscribing) has a EventHandlerInvoker parameter, which is the class which will send the events to your Event Handling Component. The SimpleEventHandlerInvoker implementation, which is by default used for your regular Event Handling Components, has the option to provide a SequencingPolicy. The default SequencingPolicy used is the SequentialPerAggregatePolicy policy, thus meaning that a thread will receive all the events for a given aggregate. You could however provide your own SequencingPolicy tailored to what you feel is the right set of events a thread should handle or not.
    I however don’t think you’ll have to be bothered by this, as I feel a Saga would be your solution.

  5. That is quite simple question actually: back your SagaConfiguration by a TrackingEventProcessor. You can do that by creating a SagaConfiguration bean by doing SagaConfiguration.trackingSagaManager({the-class-name-of-your-saga}). That will instantiate a SagaConfiguration for your particular DownloadSaga, which will create all the required beans for that saga to work. And, as a TrackingEventProcessor by default is asynchronous from the rest, you’re Saga is thus handling events asynchronously from the rest.

Hope this helps you out!

Cheers,

Steven