in the below SagaEventHandler i invok an external system, in the case of succes i send the command ConfirmConfirmPayer
to save the result in the data base
@Saga
class PTSProcess {
@StartSaga
@SagaEventHandler(associationProperty = "externalId")
fun handle(event: ConfirmPayerRequested) {
SagaLifecycle.associateWith("externalId", event.externalId.asReference())
(payerConnector.callWebService( payerConnector.defaultUri, jAXBElement) as JAXBElement<ConfirmPayer10Response>).toMono()
.doOnError(ResponseStatusException::class.java) { logger.error("error : $it") }
.doOnError(Exception::class.java) { logger.error("error : $it") }
.doOnSuccess {
commandGateway.send<Status<Unit>>(
ConfirmConfirmPayer(
event.externalId,
confirmPayerRs.errorCode,
confirmPayerRs.keyValues
)
).block()
}.subscribe()
}
}
in the below EventHandler i saved the result in the data base and i emited the saved object
@EventHandler
fun on(event: ConfirmPayerConfirmed) {
val confirmPayerRs = mapping.confirmPayerConfirmedToConfirmPayerRs(event)
saveDocument(confirmPayerRs)
emitter.emit(GetConfirmPayer::class.java, { it.externalId == externalId }, confirmPayerRs)
}
in the below saga i send the command RequestConfirmPayer
(that invokes the event ConfirmPayerRequested
), then i use subscriptionQuery
to get the saved object in the data base
@Saga
class PayerProcess {
@StartSaga
@SagaEventHandler(associationProperty = "requestNumber")
fun handle(event: PaymentConfirmed) {
SagaLifecycle.associateWith("requestNumber", event.requestNumber.asReference())
val confirmPayerRs = commandGateway.send<Status<ExternalId>>(
RequestConfirmPayer(externalId = ExternalId(), keyValues = it.keyValues)
).then(getProjectionProducedByConfirmPayerId(externalId)).block()
println(confirmPayerRs)
}
private fun getProjectionProducedByConfirmPayerId(externalId: ExternalId): Mono<ConfirmPayerRs> {
return queryGateway.subscriptionQuery(GetConfirmPayer(externalId), ResponseTypes.instanceOf(Void::class.java), ResponseTypes.instanceOf(ConfirmPayerRs::class.java)
).flatMap { queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(20)).doFinally { queryResult.close() }}
}
}
the problem is that i have this error Did not observe any item or terminal signal within 20000ms in 'next'
LOG
Dispatched messages: [PaymentConfirmed]
Incoming message: [PaymentConfirmed]
Dispatched messages: [RequestConfirmPayer]
Incoming message: [RequestConfirmPayer]
[RequestConfirmPayer] executed successfully with a [Ok] return value
the below log is displayed when the 20s are done, i tried to change it, but alwas this log is displayed when the duration of the timeout is done
EventListener [AnnotatedSaga] failed to handle event [0a9fd626-68d9-46c9-934c-74e43888836f] (com.pt.api.PaymentConfirmed). Continuing processing with next listener
reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 20000ms in ‘next’ (and no fallback has been configured)
[PaymentConfirmed] executed successfully with a [null] return value
Dispatched messages: [ConfirmPayerRequested]
Incoming message: [ConfirmPayerRequested]