I am using the SubscriptionQueryResult to subscribe to updates for a specific data. The issue occurs when I call sendStatusNotification once, followed by send which triggers the event. Initially, I receive only a single update, but when I repeat the test, I receive two updates instead of just one.
Here is a summary of the issue:
- I execute the
sendStatusNotificationmethod, followed by thesendmethod, which emits an event. The expected behavior is that only one event is received. - In the first execution, the
sendStatusNotificationmethod behaves as expected, emitting a single event. - However, in subsequent tests (without changing any parameters), two events are received. The
take(1)operator is used to ensure only one event is emitted, but it still results in two updates.
Could you help me understand why I am receiving multiple events in this case, even though the take(1) operator is applied? Is there something I might be missing regarding subscription cleanup or event emission when using SubscriptionQueryResult with the query gateway?
@GetMapping(value = "{id}/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StatusResponse>> sendStatusNotification(@PathVariable final String id) {
final SubscriptionQueryResult<Void, StatusResponse> result = queryGateway.subscriptionQuery(new GetStatusNotification(id),
ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(StatusResponse.class));
return result.updates()
.timeout(Duration.ofSeconds(5))
.take(1)
.map(update -> ServerSentEvent.<StatusResponse>builder()
.event(UPDATE)
.data(update)
.build())
.doFinally(signal -> result.close())
.onErrorResume(throwable -> {
if (throwable instanceof TimeoutException) {
log.error("Timeout exception caught: " + throwable.getMessage());
}
return Mono.empty();
});
}
@PostMapping
public void send() {
final StatusResponse response = StatusResponse.builder().status(Status.VALIDATED).statusDate(LocalDate.now()).build();
queryUpdateEmitter.emit(GetStatusNotification.class, query -> query.id().equalsIgnoreCase("4444"), response);
}
@QueryHandler
public Optional<Void> on(GetStatusNotification query) {
return Optional.empty();
}
video: https://drive.google.com/file/d/1-xvnz-mN40VVlhsRVsxPP5JZh1uzBde9/view