Query Subscription over multiple instances of one service

We have the following setup.

A separate frontend (Angular) and backend (Spring boot with Axon framework) application. Our frontend will be deployed as a single service while the backend has multiple instances (All connected to Axon Server) of the same service (loadbalanced). The communication between front- and backend is done using websockets (Atmosphere).

Now we’re struggling with the following use case. The client (frontend) will open a websocket at let’s say wss://…/query/users and it sends a FindAllUsersQuery object to the server. The server will subscribe to that query using the subscriptionQuery on the queryGateway.

e.g

SubscriptionQueryResult<..> fetchQueryResult = queryGateway.subscriptionQuery(findAllUsersQuery, ...);

Next, the server will fetch the initial result and send it back over the /query/users socket.

e.g.

return fetchQueryResult.initialResult().block.stream();

All subsequent updates on that query (triggered by the QueryUpdateEmitter), will be sent from the server over the same socket.

e.g.

fetchQueryResult.updates().subscribe( users -> { 
    metaBroadcaster.broadcastTo("/query/users", users); // atmosphere websocket broadcast to all connections on /query/users 
})

So far, no problem.

However, each time a new socket is opened - either by the same user (eg when opening a new tab in the browser) or someone else - for the same Query, a new subscriptionQuery is registered, resulting in broadcasting the result as many times as there are subscriptions (5 subscriptions → 5 query callbacks → 5 messages to the same user). Moreover, even if we can somehow realize that only 1 subscription for a specific query is active on a specific instance, we’d still don’t know if someone else wants to register the same subscription query on another instance (after all, we halve multiple instances running of the same application).

So how do we make sure that we only have one subscription running for a specific Query on multiple instances of our service? Or isn’t this the way subscriptionQueries are intended to be used?

And would the giftcard example (where you use vert.x and Vaadin) still work when deployed X times? Wouldn’t it suffer from the same problem that we mention here above? And how do we make sure that the number of subscription queries doesn’t explode (number of threads, potential leaking of queries that are never cancelled/closed, etc) ?

3 Likes

hi @Kristof86,

I’ve spend some time on this question but some things are not clear yet.
When you say you have 5 subscription and you get 5 messages, it is 5 messages for each client or 5 in general? Just to be clear!

Also, Atmosphere is not my cup of tea so could you provide a small sample where we can see this problem you describe? In that way, I can easily go through it to try to understand and help you better.

To your last questions, IMO a subscription query could be created for each client without problems. In that way, they all get the initial result together with updates, which will form the current state (and not only the updates, in case we would have just one ‘global’ subscription query).
About GiftCard, it should work without any problems.
About the “subscription queries not exploding”, you can always add monitoring and metrics for that… but as soon as the client is disconnected, the subscription is also terminated.

KR,

Hello

Thanks already for getting back to me. I’ve been working on this the last few days so now I might have some better examples than 14 days ago. At that point I didn’t have any code implemented yet.

Maybe one first question to be clear on how subscription queries work. Do you agree that once you’ve made a queryGateway.subscriptionQuery(…) call, you’ll receive updates until the application shuts down?

SubscriptionQueryResult<?, ?> subscriptionQueryResult = queryGateway
                .subscriptionQuery(query, ResponseTypes.instanceOf(query.returnType()), ResponseTypes.instanceOf(query.returnType()));
subscriptionQueryResult.handle(handleResult, handleResult);

Unless you cancel it yourself

subscriptionQueryResult.cancel()

So to handle this problem (of subscribing multiple times for the same query), I now hold a subscription map (cache) and everytime someone tries to call a subscriptionQuery for the same query, I just lookup if a subscriptionQueryResult already exists in my cache, cancel it and subscribe again.

private final Map<String, SubscriptionQueryResult<?, ?>> activeSubscriptionQueries = new HashMap<>();
...
// Don't mind the topic too much.. it uniquely identifies a query
if (this.activeSubscriptionQueries.containsKey(topic)) {
	log.debug("Cancel subscription query for topic [{}]", query);
	this.activeSubscriptionQueries.get(topic).cancel();
	this.activeSubscriptionQueries.remove(topic);
}
SubscriptionQueryResult<?, ?> subscriptionQueryResult = queryGateway
	.subscriptionQuery(query, ResponseTypes.instanceOf(query.returnType()), ResponseTypes.instanceOf(query.returnType()));

this.activeSubscriptionQueries.put(topic, subscriptionQueryResult);
// handleResult is a consumer that will send the result back over the socket
subscriptionQueryResult.handle(handleResult, handleResult);

I need to hold this cache because if I don’t, multiple subscription queries (for the same query) will stack up and the result (handled by the handleResult) will be broadcasted back to my client(s). And my client only expects one initial result and updates from one subscription… not multiple

Do you agree that this is the correct approach? I’m not really sure what you mean by “as soon as the client is disconnected, the subscription is terminated” because I don’t see that kind of behavior. I’m still receiving updates in my logs even though no one is using the application. So it never ends unless I cancel it manually.

Now with everything implemented above, I’m now sure that only one subscriptionQuery is active for a specific query. But I guess that only works on one node.
The moment we deploy this service multiple times, I’ll have a cache for each service and they don’t know anything about each other so we’ll end up with subscribing the same query multiple times over different instances.
The handleResult consumer (also mentioned above) will broadcast that result to the client(s) so we again would receive more updates than we want.

e.g.

  • Client A subscribes to websocket /ws/query/socket on Service 1 and sends a FindAllUsers query.
  • Service 1 starts a subscription query for FindAllUsers and sends back the initial result over /ws/query/socket back to client A
  • Client B subscribes to websocket /ws/query/socket on Service 1 and sends a FindAllUsers query.
  • Service 1 cancels the active subscriptionQuery and starts a new one. It sends back the initial result…
  • Service 1 intercepts updates for the subcriptionQuery and sends updates to both Client A and B because they are both connected to /ws/query/socket
  • Client C subscribes to websocket /ws/querysocket on Service 2 and sends a FindAllUsers query.
  • Service 2 starts a subscription queyry for FindAllUsers (because it doesn’t know anything about service 1 already having one) and sends back the initial result.
  • Services 1 and 2 intercept updates for the subscriptionQuery FindAllUsers and send the updates over /ws/query/socket to client A/B/C but 2 times now because
    /ws/query/socket is a loadbalanced websocket solution and those 2 subscriptionQueries will send their result to every client connected to service 1 and service 2.

So for the last case, I don’t have a solution yet. Except implementing something custom so multiple services share some kind of active subscriptionQuery state so they do now which subscriptionQueries are running on which service.

Not sure if I made my question more clear or just worse :). If it is not clear, I’ll probably aks this again in a Zoom Q&A session.

Thx

2 Likes

Hi @Kristof86, thanks for the detailed explanation.

After talking to some peers, we concluded that the problem is probably on how the Atmosphere broadcast operation works. In short, every subscription query opened by a client should have an identification so every client will get its own subscription query initial result and updates, that is the most pragmatic way of using it.

If you have multiple client subscribing to the “same” subscription query without having a way to identify it, probably that is where you get duplicated updates, as you described.
Having a cache, as you did, looks fine but you will either:

  1. not get the initial result for new subscribers;
  2. resubscribe on every reconnect to be able to send the initial result to the new subscribers which will also duplicate/broadcast the messages to the old ones.

Also you have to be careful to not close the stream when a client disconnects because by using a cache, you assume that there are other clients connected to that.

For the case where you talk about closing a subscription, it very much depends on how you implement it. While using ServerSentEvents and Flux, I added a ping operation to check if the client was still there. If not, it would close that stream for me.
Somethihg like this:

/*For Server Sent Events, server doesn't get close signal when client closes connection and we are left with hanging infinitive streams.
 * Workaround is to implement heart beat which will detect that no one is listening on another side and close stream automatically */
Flux<ServerSentEvent<MyEntity>> heartbeatStream = Flux.interval(Duration.ofSeconds(2))
															   .map(i -> ServerSentEvent.<MyEntity>builder()
																	   .event("ping").build());

return Flux.merge(updateStream, heartbeatStream);

You can probably do something similar on your side but for WebSockets if I am not wrong.

Hope that all makes sense and you can follow with your project. If not, let us know!

KR,