Subscription queries in Sagas

in the below SagaEventHandler i invok an external system, in the case of succes i send the command ConfirmConfirmPayer to save the result in the data base

@Saga
class PTSProcess {

    @StartSaga
    @SagaEventHandler(associationProperty = "externalId")
    fun handle(event: ConfirmPayerRequested) {
        SagaLifecycle.associateWith("externalId", event.externalId.asReference())

        (payerConnector.callWebService( payerConnector.defaultUri, jAXBElement) as JAXBElement<ConfirmPayer10Response>).toMono()
            .doOnError(ResponseStatusException::class.java) { logger.error("error : $it") }
            .doOnError(Exception::class.java) { logger.error("error : $it") }
            .doOnSuccess {
                 commandGateway.send<Status<Unit>>(
                        ConfirmConfirmPayer(
                            event.externalId,
                            confirmPayerRs.errorCode,
                            confirmPayerRs.keyValues
                        )
                    ).block()
            }.subscribe()
    }
}

in the below EventHandler i saved the result in the data base and i emited the saved object

    @EventHandler
    fun on(event: ConfirmPayerConfirmed) {
        val confirmPayerRs = mapping.confirmPayerConfirmedToConfirmPayerRs(event)
        saveDocument(confirmPayerRs)
		
	    emitter.emit(GetConfirmPayer::class.java, { it.externalId == externalId }, confirmPayerRs)
    }

in the below saga i send the command RequestConfirmPayer (that invokes the event ConfirmPayerRequested), then i use subscriptionQuery to get the saved object in the data base

@Saga
class PayerProcess {

    @StartSaga
    @SagaEventHandler(associationProperty = "requestNumber")
    fun handle(event: PaymentConfirmed) {
        SagaLifecycle.associateWith("requestNumber", event.requestNumber.asReference())

        val confirmPayerRs = commandGateway.send<Status<ExternalId>>(
            RequestConfirmPayer(externalId = ExternalId(), keyValues = it.keyValues)
        ).then(getProjectionProducedByConfirmPayerId(externalId)).block()

        println(confirmPayerRs)
    }

    private fun getProjectionProducedByConfirmPayerId(externalId: ExternalId): Mono<ConfirmPayerRs> {
        return queryGateway.subscriptionQuery(GetConfirmPayer(externalId), ResponseTypes.instanceOf(Void::class.java), ResponseTypes.instanceOf(ConfirmPayerRs::class.java)
        ).flatMap { queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(20)).doFinally { queryResult.close() }}
    }
	
}

the problem is that i have this error Did not observe any item or terminal signal within 20000ms in 'next'

LOG

Dispatched messages: [PaymentConfirmed]
Incoming message: [PaymentConfirmed]
Dispatched messages: [RequestConfirmPayer]
Incoming message: [RequestConfirmPayer]
[RequestConfirmPayer] executed successfully with a [Ok] return value

the below log is displayed when the 20s are done, i tried to change it, but alwas this log is displayed when the duration of the timeout is done

EventListener [AnnotatedSaga] failed to handle event [0a9fd626-68d9-46c9-934c-74e43888836f] (com.pt.api.PaymentConfirmed). Continuing processing with next listener

reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 20000ms in ‘next’ (and no fallback has been configured)

[PaymentConfirmed] executed successfully with a [null] return value
Dispatched messages: [ConfirmPayerRequested]
Incoming message: [ConfirmPayerRequested]

Try to change block by subscribe

 val confirmPayerRs = commandGateway.send<Status<ExternalId>>(
            RequestConfirmPayer(externalId = ExternalId(), keyValues = it.keyValues)
        ).then(getProjectionProducedByConfirmPayerId(externalId)).subscribe()