How can Subscription Query stay active?

We use AxonServer 4.2.4 and in Overview of ClientService I can see slowly increasing #ActiveSubscriptions. However most of #Subscriptions are “closed” properly. We were not able to find a place/usecase where this would be happening.

We have external system we need to notify about some events.
So what we do is that Our AdapterService handle all of those events which external system is interesed in and sends data from events over REST api.

And in some cases we need to wait for this communication to be done before sending response to client (because client will also load data from external system so we need to be sure they are there already).
So we have ExternalDataTransferDoneEvent when api call to external system is finished. We also have PaymentDataTransferredEventEmitter which listen to this event and ExternalDataTransferredQuery we use for such cases.

Simplified code example:

`

// ClientService

@RestController
public class ClientApi {

private final QueryGateway queryGateway;
private final CommandGateway commandGateway;

@RequestMapping(value = “/path”,
produces = { “application/json” },
method = RequestMethod.POST)
public ResponseEntity doSomeAction() {

// to be sure data were send to external system
SubscriptionQueryResult<Boolean, Boolean> externalDataTransferQueryResult =
queryGateway.subscriptionQuery(ExternalDataTransferredQuery.create(checkoutId),
ResponseTypes.instanceOf(Boolean.class),
ResponseTypes.instanceOf(Boolean.class));

// this we need in response
SubscriptionQueryResult<SomeObject, SomeObject> queryResult =
queryGateway.subscriptionQuery(
new SomeQuery(…),
ResponseTypes.instanceOf(SomeObject.class),
ResponseTypes.instanceOf(SomeObject.class));

commandGateway.send(new SomeCommand(checkoutId));

try (queryResult) {
SomeObject someObject = queryResult
.updates()
.blockFirst(Duration.ofMillis(restEndpointsConfiguration.getInternalTimeOutInMillis()));
} catch (IllegalStateException ex) {

}

if(someObject.isExternalDataTransferRequired()) {
try {
Boolean result = externalDataTransferQueryResult.updates()
.blockFirst(Duration.ofMillis(restEndpointsConfiguration.getInternalTimeOutInMillis()));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

return ResponseEntity.status(OK).body(map(someObject));
}

}

// shared library
public class ExternalDataTransferDoneEvent extends AbstractCheckoutRelated {

private boolean successful = false;

public ExternalDataTransferDoneEvent(String checkoutId, boolean successful) {
super(checkoutId);
this.successful = successful;
}

public static ExternalDataTransferDoneEvent successful(String checkoutId) {
return new ExternalDataTransferDoneEvent(checkoutId, true);
}

public static ExternalDataTransferDoneEvent failed(String checkoutId) {
return new ExternalDataTransferDoneEvent(checkoutId, false);
}

}

// ClientService
public class ExternalDataTransferredQuery {

private String checkoutId;

public static ExternalDataTransferredQuery create(String checkoutId) {
return new ExternalDataTransferredQuery(checkoutId);
}
}

// ClientService
@Component
@RequiredArgsConstructor
@Slf4j
@ProcessingGroup(“ClientEventEmitterProcessor”)
public class PaymentDataTransferredEventEmitter {

private final QueryUpdateEmitter queryUpdateEmitter;

@EventHandler
public void on(ExternalDataTransferDoneEvent evt) {
queryUpdateEmitter.emit(ExternalDataTransferredQuery.class,
query -> query.getCheckoutId().equals(evt.getCheckoutId()), evt.isSuccessful());
queryUpdateEmitter
.complete(ExternalDataTransferredQuery.class, query -> query.getCheckoutId().equals(evt.getCheckoutId()));
}

@SuppressWarnings(“squid:S2447”)
@QueryHandler
public Boolean handle(ExternalDataTransferredQuery qry) {
return null; // required
}
}

// AdapterService
@Component
@ProcessingGroup(“AdapterEventHandlerProcessor”)
public class AdapterEventHandler {

private final AdapterControllerApi api;
private final EventGateway eventGateway;

@EventHandler
public void on(XyzEvent evt) {
try {
api.postData(map(evt))
eventGateway.publish(ExternalDataTransferDoneEvent.successful(event.getCheckoutId()));
} catch (RuntimeException e) { // e.g RestClientResponseException
eventGateway.publish(ExternalDataTransferDoneEvent.failed(event.getCheckoutId()));
throw e;
}
}

@EventHandler
public void on(AbcEvent evt) {
try {
api.postData(map(evt))
eventGateway.publish(ExternalDataTransferDoneEvent.successful(event.getCheckoutId()));
} catch (RuntimeException e) { // e.g. RestClientResponseException
eventGateway.publish(ExternalDataTransferDoneEvent.failed(event.getCheckoutId()));
throw e;
}
}
}

`

A) Is there a way to see which Subsctiobcion Queries are active?
B) How they stay active if we have queryUpdateEmitter.complete(ExternalDataTransferredQuery.class, query -> query.getCheckoutId().equals(evt.getCheckoutId())); And we are not missing any ExternalDataTransferDoneEvent

Note: We use exact same approach in 3 services. They all have its own ExternalDataTransferredQuery and PaymentDataTransferredEventEmitter. We are seeing “unclosed” Active Queries in 2 of them, but 3rd one does not have much load.
Note: We only see this on environment with load, test environment is not affected
Note: There might be 2 Events emmited which needs to be transferred to external system. They would have same checkoutId. But this is rare so expect this is happening also when single Event is send to external system.
Note: Sometimes we dont call .updates.blockFirst on the Query but it does not seems to be a problem.

C) This is probably unrelated question. In Axon Server what is difference between Overview menu where for application we can see Queries and Queries menu?
We have 2 (out of 9) queries which has some value in Queries menu and those 2 queries has 0 in the Overview menu. For other 7 queries its the other way around

Thanks

Hi,

it seems you’re not closing the Subscriptions on the consuming side. Note that a “complete” marker on the dispatching side doesn’t close a subscription. It merely puts a marker that no updates will follow. The Flux on the client-side will still allow the Flux to be accessed for its data.

As it seems you’re only expecting a single response, you can do a externalDataTransferQueryResult.cancel() (don’t forget the same on the queryResult). Alternatively, you can wrap the queries in a try-with-resources block.
You can also do:
externalDataTransferQueryResult.updates().doOnTerminate(externalDataTransferQueryResult::close).block(…)

Kind regards,

Thanks for your reply.

We were really missing try-with-resource for this subscription query. I also add externalDataTransferQueryResult.cancel() as you recommended for cases where we create the query but we dont .block on it as its not needed in that case.

Seems to work perfectly
Thanks