Hi, I’m new to Axon subscription queries. I am doing POC to stream set of events and terminate server side after streaming specific set of events. It is able to stream events but not able to cancel or stop streaming after sometime. How can I stop streaming events from server-side after certain amount of time?
I came across following article and it mentions using QueryUpdateEmitter.complete
to signal completion of subscription query. "But subscriptions can also be canceled by the read side. For instance, maybe a card that has been fully redeemed will never receive any more events. In that case, the read side can call complete() on the QueryUpdateEmitter
to signal this. "
Here’s a snippet of my code: I am expecting to stream the book created events for 1 minute but cancel subscription query after that. However, it does not stop streaming events even after calling QueryUpdateEmitter.complete()
@EventHandler
public void on(BookCreatedEvent event, @Timestamp Instant timestamp) {
long currentTime = System.currentTimeMillis();
final BookSummaryEntity bookSummary = mapper.createdEventToBookSummaryEntity(event, timestamp);
repository.save(bookSummary);
log.debug("On {}: Saved the first summary of the book named {}", event.getClass().getSimpleName(), event.getTitle());
queryUpdateEmitter.emit(
AllBookSummariesQuery.class,
query -> true,
bookSummary
);
queryUpdateEmitter.complete(AllBookSummariesQuery.class, query -> (new Date(currentTime).getTime() < new Date(initialTime + TimeUnit.MINUTES.toMillis(1)).getTime()));
}
I also added following to my subscription query result, I am not sure when it would call doOnComplete(emitter::complete)
after QueryUpdateEmitter.complete()
predicate filter is calculated to true. But it appears that it has no effect and it continues to stream all events.
SubscriptionQueryResult<List<BookSummaryEntity>, BookSummaryEntity> result = queryGateway.subscriptionQuery(
new AllBookSummariesQuery(),
ResponseTypes.multipleInstancesOf(BookSummaryEntity.class),
ResponseTypes.instanceOf(BookSummaryEntity.class));
return Flux.create(emitter -> {
result.initialResult().flatMapMany(Flux::fromIterable).map(value -> mapper.bookEntityToResponseDto(value)).subscribe(emitter::next);
result.updates()
.doOnNext(holder -> {
if (emitter.isCancelled()) {
result.close();
}
})
.doOnComplete(emitter::complete)
.map(value -> mapper.bookEntityToResponseDto(value)).subscribe(emitter::next);
});