Hello everyone,
I created a simple projet to test the subscription query.
I have the below api to create an entity with id, then update the entity to add the name, then i print the response in the console. With this way th response is displayed in the console.
@PostMapping("/entities/{id}")
public void myApi(@PathVariable("id") String entityId) {
commandGateway.send(new CreateMyEntityCommand(entityId)).block();
final MyEntity res = commandGateway.send(new UpdateMyEntityCommand(entityId, "test")).then(sendAndReturnUpdate(entityId)).block();
System.out.println(res);
}
public Mono<MyEntity> sendAndReturnUpdate(String entityId) {
return queryGateway.subscriptionQuery(new GetMyEntityQuery(entityId), ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(MyEntity.class))
.flatMap(queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(15)).doFinally(it -> queryResult.close())
);
}
The problem when i work with saga
@Saga
public class MyEntitySaga {
@Autowired
private ReactorCommandGateway commandGateway;
@Autowired
private ReactorQueryGateway queryGateway;
@StartSaga
@SagaEventHandler(associationProperty = "entityId")
void handle(MyEntityCreatedEvent event) {
final MyEntity res = commandGateway.send(new UpdateMyEntityCommand(event.getEntityId(), "test"))
.then(sendAndReturnUpdate(event.getEntityId())).block();
System.out.println(res);
}
@EndSaga
@SagaEventHandler(associationProperty = "entityId")
void handle(MyEntityUpdatedEvent event) {
}
public Mono<MyEntity> sendAndReturnUpdate(String entityId) {
return queryGateway.subscriptionQuery(new GetMyEntityQuery(entityId), ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(MyEntity.class))
.flatMap(queryResult -> queryResult.updates().next().timeout(Duration.ofSeconds(15)).doFinally(it -> queryResult.close())
);
}
}
with the above code i have this error, although it’s the same code
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 15000ms in ‘next’ (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.24.jar:3.4.24]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
… 1 common frames omitted