Unexpected Multiple Events Received Despite Using take(1) in SubscriptionQueryResult

I am using the SubscriptionQueryResult to subscribe to updates for a specific data. The issue occurs when I call sendStatusNotification once, followed by send which triggers the event. Initially, I receive only a single update, but when I repeat the test, I receive two updates instead of just one.

Here is a summary of the issue:

  • I execute the sendStatusNotification method, followed by the send method, which emits an event. The expected behavior is that only one event is received.
  • In the first execution, the sendStatusNotification method behaves as expected, emitting a single event.
  • However, in subsequent tests (without changing any parameters), two events are received. The take(1) operator is used to ensure only one event is emitted, but it still results in two updates.

Could you help me understand why I am receiving multiple events in this case, even though the take(1) operator is applied? Is there something I might be missing regarding subscription cleanup or event emission when using SubscriptionQueryResult with the query gateway?


    @GetMapping(value = "{id}/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StatusResponse>> sendStatusNotification(@PathVariable final String id) {
        final SubscriptionQueryResult<Void, StatusResponse> result = queryGateway.subscriptionQuery(new GetStatusNotification(id),
                ResponseTypes.instanceOf(Void.class), ResponseTypes.instanceOf(StatusResponse.class));

        return result.updates()
                .timeout(Duration.ofSeconds(5))
                .take(1)
                .map(update -> ServerSentEvent.<StatusResponse>builder()
                        .event(UPDATE)
                        .data(update)
                        .build())
                .doFinally(signal -> result.close())
                .onErrorResume(throwable -> {
                    if (throwable instanceof TimeoutException) {
                        log.error("Timeout exception caught: " + throwable.getMessage());
                    }
                    return Mono.empty();
                });
    }

    @PostMapping
    public void send() {
        final StatusResponse response = StatusResponse.builder().status(Status.VALIDATED).statusDate(LocalDate.now()).build();
        queryUpdateEmitter.emit(GetStatusNotification.class, query -> query.id().equalsIgnoreCase("4444"), response);
    }

    @QueryHandler
    public Optional<Void> on(GetStatusNotification query) {
        return Optional.empty();
    }

video: https://drive.google.com/file/d/1-xvnz-mN40VVlhsRVsxPP5JZh1uzBde9/view

That’s weird, try using next() instead of take(1)

I’ve just taken this code locally, and was unable to reproduce it.

I did notice one thing: the result.close() method doesn’t exist. I was using Axon Framework 2.10.

Given you’re calling take(1) on the Flux, it’s not Axon’s code anymore that executes it. Even if Axon were to produce more than one result, the flux should filter them out at that point.

You might be hitting a (solved) bug here. Did you update both Axon and Reactor to the latest available versions?

Yes, I use version 4.10.2 of Axon


       <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-configuration</artifactId>
            <version>4.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-spring-boot-starter</artifactId>
            <version>4.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.axonframework.extensions.reactor</groupId>
            <artifactId>axon-reactor-spring-boot-starter</artifactId>
            <version>4.10.0</version>
        </dependency>

I tried using next() instead of take(1). The method now returns Mono<ServerSentEvent<StatusResponse>>, and I removed produces = MediaType.TEXT_EVENT_STREAM_VALUE. However, when I test the /status API, it always returns an empty JSON

After spending a lot of time debugging, I discovered that another library is causing this issue, but I still haven’t found the exact root cause.
Thank you for your interaction :slightly_smiling_face:

2 Likes