I want to create multiple cloud servers in parallel and publish ProceedingEvent for each step so the user can keep track. here’s what the code looks like.
class CloudCreationSaga {
// annotation
fun on(event: CloudServersCreated){
// create multiple cloud server in parallel
Flux.fromIterable(event.cloudServers)
.flatmap { cloud ->
// simplify nested flatmap
openStackService
.createServer()
.doOnNext { publishProccedingEvent(id = cloud.serverId, state = ServerState.Scheduling) }
.configNetwork()
.doOnNext { publishProccedingEvent(id = cloud.serverId, state = ServerState.Networking) }
.configVolume()
.doOnNext { publishProccedingEvent(id = cloud.serverId, state = ServerState.DeviceMapping) }
.launch()
// success and error event are GenericDomainEventMessage
.doOnSuccess { cloud -> publishSucceedEvent(id = cloud.serverId) }
.doOnError { error -> publishFailedEvent(id = cloud.serverId, message = error.message) }
// so other server can keep going on if one failed
.onErrorResume { Mono.empty() }
}
.parallel(4)
.runOn(Schedulers.parallel())
.sequential()
.collectList()
.block()
}
fun publishProccedingEvent(id: String, state: ServerState){
eventBus.publish(GenericEventMessage(
CloudServerProceeding(serverId = id, state = state, ..rest)
))
}
// annotation
fun on(event: CloudServerCreationSucceeded){
successAmount++
if (totalServer == (successAmount+failedAmount)){
SagaLifecycle.end()
}
}
// annotation
fun on(event: CloudServerCreationFailed){
failedAmount++
if (totalServer == (successAmount+failedAmount)){
SagaLifecycle.end()
}
}
}
I expect this code to create each server in parallel for fast and efficiency and then keep track the state of each server by publishing ProceedingEvent (I have projection handler to save these state to db) and also handle any error that could happen during the execution. The problem is, the published events seems to have invalid order sometimes (most of the time it worked properly, but sometime it didn’t).
This is what I got querying from axon server with “payloadData contains ${serverId}”.
CloudServersCreated → Publish → Succeed → Publish → Publish,
but it should be CloudServersCreated → Publish → Publish → Publish → Succeed.
what’s going on here?