Saga doesn't store state when I use with .subscribe()

So I want to create cloud instance via Saga because it requires amount of time and I have to poll request the state of cloud until it’s ready. The code may look like this

@Saga
class AWSEC2CloudInstantiationSaga {

  lateinit var ipString: String
  lateinit var awsCloudService: AWSCloudService
  lateinit var eventBus: EventBus
  
  @StartSaga
  @SagaEventHandler(associationProperty = "serverId")
  fun on(event: AWSEC2CloudInstantiated){
    awsCloudService
      .createServer(...)
      .setSchedule(...)
      .setNetwork(...)
      // I already initialize ipString right here
      .doOnNext { cloud -> ipString = cloud.ipv4 }
      .launch(...)
      .doOnSuccess { eventBus.publish(AWSEC2CloudLaunching(...)) }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
  }

  @SagaEventHandler(associationProperty = "serverId")
  fun on(event: AWSEC2CloudLaunching){
    // this throw exception lateinit property ipString has not been initialize
    awsCloudService
      .checkState(..., ipString = ipString)
      .doOnNext { state -> /* schedule event of duration 10 seconds if not ready */ }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
  }
}

But it didn’t work as I expected. On the AWSEC2CloudInstantiated event it handles task successfully, but when It handles AWSEC2CloudLaunching event this exception raises: lateinit property ipString has not been initialize. I’m confused since I already initialize ipString property inside doOnNext method already, but when I use with .block() instead of .subscribe() it works fine no exception was throw. Anyway, I want to run it with .subscribe() since there’re a lot of cloud instances that will need to launch at the same time. So what’s going on here are there any bugs in my code?

Unfortunately subscribing to reactor pipeline inside transactional methods like Saga,Event,Command,Query handler is not supported by Axon Framework yet as underlying ThreadLocal used by Axon is not compatible with Reactor

In your case, when you subscribe inside handler it will immediately go to a different thread and the saga will end. What happens next is completely indeterministic, if the pipeline is short it might execute, if it uses some data from UoW then most likely not… Even a garbage collector might kick in and clean up your subscription, who knows!

In this case, you can either block or implement your own reactive saga handlers (without transactional powers, like storing saga state, unfortunately). Basically, you would need to create Flux in some other context, and publish tasks to this Flux to execute from Saga. I did experiment with this:

1. Defining Flux somewhere else

2. Bind Axon Event Handler with Flux pipeline

3. Reactive event handler implementation

This is old and experimental repo, so don’t hold to it, but idea is the same, create flux somewhere else and publish task to this flux from saga handler.
If you don’t block you will not be able to store saga state, but you can publish new event from reactive pipeline that new IP has been obtain, and create saga event handler that will react on that event to store IP in saga.

2 Likes

Thank you for the advice, I will take a look at your example and try to fix it. thanks a lot.

1 Like

The way I got around the threadlocal issue when using sagas and reactor was to just publish the state changes as separate events. For example:

.doOnNext { cloud -> eventBus.publish(IpStoredEvent(serverId, cloud.ipv4) }

Remove the doOnSuccess part.

Add new event handler:

@SagaEventHandler(associationProperty = "serverId")
fun on(event: IpStoredEvent) {
  ipString = event.ip
  eventBus.publish(AWSEC2CloudLaunching(...))
}

And your existing handler for AWSEC2CloudLaunching can access the state.

1 Like