TimeoutException subscription query in saga

Hello everyone,

I created a simple projet to test the subscription query.

I have the below api to create an entity with id, then update the entity to add the name, then i print the response in the console. With this way th response is displayed in the console.

    @PostMapping("/entities/{id}")
    public void myApi(@PathVariable("id") String entityId) {
        commandGateway.send(new CreateMyEntityCommand(entityId)).block();

        final MyEntity res = commandGateway.send(new UpdateMyEntityCommand(entityId, "test")).then(sendAndReturnUpdate(entityId)).block();
        System.out.println(res);
    }

    public Mono<MyEntity> sendAndReturnUpdate(String entityId) {
        return queryGateway.subscriptionQuery(new GetMyEntityQuery(entityId), ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(MyEntity.class))
                .flatMap(queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(15)).doFinally(it -> queryResult.close())
        );
    }

The problem when i work with saga

@Saga
public class MyEntitySaga {

    @Autowired
    private ReactorCommandGateway commandGateway;

    @Autowired
    private ReactorQueryGateway queryGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "entityId")
    void handle(MyEntityCreatedEvent event) {
        final MyEntity res = commandGateway.send(new UpdateMyEntityCommand(event.getEntityId(), "test"))
                .then(sendAndReturnUpdate(event.getEntityId())).block();
        System.out.println(res);
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "entityId")
    void handle(MyEntityUpdatedEvent event) {
    }

    public Mono<MyEntity> sendAndReturnUpdate(String entityId) {
        return queryGateway.subscriptionQuery(new GetMyEntityQuery(entityId), ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(MyEntity.class))
                .flatMap(queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(15)).doFinally(it -> queryResult.close())
                );
    }
}

with the above code i have this error, although it’s the same code

Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 15000ms in ‘next’ (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.24.jar:3.4.24]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
… 1 common frames omitted

Hi Rayen,

The main use case for subscription queries in a UI. You should not have long-running methods in a Saga. As an example of a subscription query you could take a look here, used in this article.