Terminating subscription queries server side

Hi, I’m new to Axon subscription queries. I am doing POC to stream set of events and terminate server side after streaming specific set of events. It is able to stream events but not able to cancel or stop streaming after sometime. How can I stop streaming events from server-side after certain amount of time?

I came across following article and it mentions using QueryUpdateEmitter.complete to signal completion of subscription query. "But subscriptions can also be canceled by the read side. For instance, maybe a card that has been fully redeemed will never receive any more events. In that case, the read side can call complete() on the QueryUpdateEmitter to signal this. "

Here’s a snippet of my code: I am expecting to stream the book created events for 1 minute but cancel subscription query after that. However, it does not stop streaming events even after calling QueryUpdateEmitter.complete()

@EventHandler
    public void on(BookCreatedEvent event, @Timestamp Instant timestamp) {
        long currentTime = System.currentTimeMillis();
        final BookSummaryEntity bookSummary = mapper.createdEventToBookSummaryEntity(event, timestamp);
        repository.save(bookSummary);
        log.debug("On {}: Saved the first summary of the book named {}", event.getClass().getSimpleName(), event.getTitle());

        queryUpdateEmitter.emit(
                AllBookSummariesQuery.class,
                query -> true,
                bookSummary
        );

        queryUpdateEmitter.complete(AllBookSummariesQuery.class, query -> (new Date(currentTime).getTime() < new Date(initialTime + TimeUnit.MINUTES.toMillis(1)).getTime()));
    }

I also added following to my subscription query result, I am not sure when it would call doOnComplete(emitter::complete) after QueryUpdateEmitter.complete() predicate filter is calculated to true. But it appears that it has no effect and it continues to stream all events.

SubscriptionQueryResult<List<BookSummaryEntity>, BookSummaryEntity> result = queryGateway.subscriptionQuery(
                new AllBookSummariesQuery(),
                ResponseTypes.multipleInstancesOf(BookSummaryEntity.class),
                ResponseTypes.instanceOf(BookSummaryEntity.class));

return Flux.create(emitter -> {
            result.initialResult().flatMapMany(Flux::fromIterable).map(value -> mapper.bookEntityToResponseDto(value)).subscribe(emitter::next);
            result.updates()
                    .doOnNext(holder -> {
                        if (emitter.isCancelled()) {
                            result.close();
                        }
                    })
                    .doOnComplete(emitter::complete)
                    .map(value -> mapper.bookEntityToResponseDto(value)).subscribe(emitter::next);
        });

Hello Payal,

Conceptually, what you are supplying here should work. The complete action on the queryUpdateEmitter will terminate subscription queries from the server side.

My guess is that something is wrong with the condition supplied there. Where is the initialTime variable coming from? Can you try it with a simple true result?

Hi Mitchell, thank you for your reply. Can you please clarify once more, Is QueryUpdateEmitter.complete really going to terminate subscription queries? I read otherwise on old google groups forum. See the referenced link below.

I was able to validate condition on complete and it is calculated correctly. Initial time is set when class gets instantiated. However, when it is complete, it stops streaming but client (browser) continues to listen and does not get terminated. How can we stop streaming for current client but let other clients still get events after they subscribe (for next 1 min)?

It would be really helpful if you can share some code examples to follow that demonstrates complete signal on emitter? Basically, I want to stop streaming server side after streaming fixed set of events or after specific time, say 1 min for each client. This is mainly for POC before integrating into a real application.

Below snippet was response from Allard from google group link: https://groups.google.com/g/axonframework/c/ywsCBtJweVY/m/2jh64OAsBwAJ

it seems you’re not closing the Subscriptions on the consuming side. Note that a “complete” marker on the dispatching side doesn’t close a subscription. It merely puts a marker that no updates will follow. The Flux on the client-side will still allow the Flux to be accessed for its data.

Hi Payal, I have been looking into this and closing the SubscriptionQuery should do the trick. one of our samples shows this. It does create a Flux in a different manner than you’ve shown above though.

 SubscriptionQueryResult<List<String>, String> result = queryGateway
                .subscriptionQuery(new ModelQuery(),
                                   ResponseTypes.multipleInstancesOf(String.class),
                                   ResponseTypes.instanceOf(String.class));

        return result.initialResult()
                     .flatMapMany(Flux::fromIterable)
                     .concatWith(result.updates())
                     .doOnError(throwable -> logger.warn("something failed"))
                     .doOnComplete(() -> logger.warn("Stream completed"))
                     .map(update -> ServerSentEvent.<String>builder()
                                                   .event("update")
                                                   .data(update)
                                                   .build());

Perhaps this can be adapted for your use-case, it looks similar

1 Like

Thank you. Example that you provided was really helpful. It solved my problem.

1 Like