Subscription queries: 'old' update in Flux stream

I am working with subscription queries en Axon Server.

Suppose I have:

  • A query command “QUERY”
  • A query handler
  • An eventListener emitting responses to “QUERY”
  • Two update events (e1 and e2) resulting in two emitted updates u1 and u2 of type Update (Event event1.getUpdate().equals(u1))
  • Axons QueryGateway (standard configuration) “queryGateway”

On the client side, I have:

`
SubscriptionQueryResult subscription1 = queryGateway.subscriptionQuery(new QUERY(), String.class, Update.class); //1
performActionCausingEventUpdate(e1); //2
subscription1.updates().blockFirst(Duration.of(15, SECONDS)); // expect u1 //3

SubscriptionQueryResult subscription2 = queryGateway.subscriptionQuery(new QUERY(), String.class, Update.class); //4
performActionCausingEventUpdate(e2); //5

subscription2.updates().blockFirst(Duration.of(15, SECONDS)); // expect u2 //6
`

On the server side, I have the following QueryHandler and listener:

`
@QueryHandler
public String handle(QUERY query) {
return “initial result”;
}

@EventHandler
public void on(Event event) {
queryUpdateEmitter.emit(QUERY.class, q->true, event.getUpdate());
}
`

Now, most of the time this works perfectly as espected.
But sometimes, the Flux stream from subscription 2 includes u1 followed by u2, where I expected this to be only u2, as I have waited for u1 to occur before starting to listen with subscription nr 2.

Why is this?

Hi Stijn,

is performActionCausingEventUpdate executed in a different thread from the one you drafted up in your question? I’m assuming that the pseudo-code you provided is a bit different in practice - you couldn’t see the second update (u2) in subscription2 since you are blocking on the first update.

Anyways, at the time you are emitting an update, all subscription queries that match the predicate (in your case q->true - all of them) will get the update.

One important thing to mention is that if you subscribe to the same flux twice, you’ll get some buffered updates on the second subscription. This buffer is configurable and by default is SMALL_BUFFER_SIZE = Math.max(16, Integer.parseInt(System.getProperty(“reactor.bufferSize.small”, “256”)));

Cheers,

Milan

Hi Milan,

Bothe the performActionCausingEventUpdate are executed in the same thread. So the line (//4) is not executed until line (//3) completed by receiving update (u1). That’s why, when creating an second subscription in line (//4), which is (guaranteed) AFTER the update u1 has fired, I did not suspect to see u1 in the second subscription.

However, your second note explains it all: If you subscribe to the same flux twice, you may get some buffered updates on the second subscription, which seem to be exactly what is happening here (sometimes). Thank you for that.

So how did I deal with this?

The pseudo code was part of some unit test that tested whether an acion was idempotent.

Basically,

I did an updating action on an aggregate, resulting in a u1 update, and waited for that update to occur. I started a subscription to wait for u1 to occur.
Then I started the second subscription.
Now I repeated the action on the aggregate, expecting in no u1 update because of idempotency I was testing.
But waiting for something NOT to occur is hard (how long would you need to wait) so instead, I did a third update action resulting in a u2 update.
And I blocked my subscription for the first update. If it is a u2, I know that no new u1 had been send, proving idempotency.

As we know now that you might get the update u1 in the second subscription, this is no longer a reliable test of idempotency.

Now, I test using only one subscription:
update Aggregate -> U1
update Aggregate -> x
update change Aggregate -> U2
and I just verify in that stream that my flux contains one U1 followed by U2.

This way, the buffering of updates in Flux streams cannot influence my test outcome anymore.

Hi Stijn,

In your example, you are not subscribing to the same flux but to a different one (since you are issuing two queries - two instances of the same query). In this case, you’ll not get buffered updates. Can you extract part of code proving that you are getting updates after you subscribed and send it to us (or publish to github)? I’ve created a test but cannot reproduce it… We might wanna investigate if it’s a framework issue…

Thanks!

Cheers,
Milan

Hi Milan,

I tried to recreate the problem myself, using simplified code that I could post on github, but I was unsuccessful trying to reproduce this that way. Unfortunately, I cannot post the customers code in public on this forum.

Hi Stijn,

I understand why you can’t share client code. Unfortunately, I’m out of ideas what might go wrong in the described scenario. As mentioned previously, me as well cannot create the scenario that reproduces the issue. Let me know if I can help somehow :slight_smile:

Cheers,
Milan

Customer agreed to send the code to you, as we have an axioniq support contract. You should have that received now, Milan.
But customer did not agree to post this code on this forum.