I am using the SubscriptionQueryResult
to subscribe to updates for a specific data. The issue occurs when I call sendStatusNotification
once, followed by send
which triggers the event. Initially, I receive only a single update, but when I repeat the test, I receive two updates instead of just one.
Here is a summary of the issue:
- I execute the
sendStatusNotification
method, followed by thesend
method, which emits an event. The expected behavior is that only one event is received. - In the first execution, the
sendStatusNotification
method behaves as expected, emitting a single event. - However, in subsequent tests (without changing any parameters), two events are received. The
take(1)
operator is used to ensure only one event is emitted, but it still results in two updates.
Could you help me understand why I am receiving multiple events in this case, even though the take(1)
operator is applied? Is there something I might be missing regarding subscription cleanup or event emission when using SubscriptionQueryResult
with the query gateway?
@GetMapping(value = "{id}/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StatusResponse>> sendStatusNotification(@PathVariable final String id) {
final SubscriptionQueryResult<Void, StatusResponse> result = queryGateway.subscriptionQuery(new GetStatusNotification(id),
ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(StatusResponse.class));
return result.updates()
.timeout(Duration.ofSeconds(5))
.take(1)
.map(update -> ServerSentEvent.<StatusResponse>builder()
.event(UPDATE)
.data(update)
.build())
.doFinally(signal -> result.close())
.onErrorResume(throwable -> {
if (throwable instanceof TimeoutException) {
log.error("Timeout exception caught: " + throwable.getMessage());
}
return Mono.empty();
});
}
@PostMapping
public void send() {
final StatusResponse response = StatusResponse.builder().status(Status.VALIDATED).statusDate(LocalDate.now()).build();
queryUpdateEmitter.emit(GetStatusNotification.class, query -> query.id().equalsIgnoreCase("4444"), response);
}
@QueryHandler
public Optional<Void> on(GetStatusNotification query) {
return Optional.empty();
}
video: https://drive.google.com/file/d/1-xvnz-mN40VVlhsRVsxPP5JZh1uzBde9/view