Subscription Query Initial Result Does Not Trigger Query Handler

When I invoke the initialResults() method of the SubscriptionQueryResult my query handler is not getting triggered, and subsequently I just get a null from the initial result, regardless if there is a persistent query model object for the queried id or not. However, the updates() method of the subscription query does deliver the Flux of updates when the model changes.

To troubleshoot this I have created two endpoints on a REST controller. In one case I return a query model using QueryGateway.query(…) and in the other case I use QueryGateway.subscriptionQuery(…).initialResults().block(). In both cases I’m query for the same model (same id) and in the first case (non-subscription query) I get the model I expect. In the second case I get nothing.

I am using axon 4.0.3 with axon server. I can reproduce this with ever model in our project so I suspect you should be able to reproduce this without a sample project. I will try to create a little sample if I can, unless someone can verify this issue without it.

THanks!

Troy

So I’ve got a dumb little demo (https://github.com/troyhart/axon-subscription-queries-sse) where I’ve attempted to reproduce the problem, but I can not. In this demo I’ve created a controller where I thought I’d be demonstrating this issue (https://github.com/troyhart/axon-subscription-queries-sse/blob/master/src/main/java/com/myco/baskets/controller/SubscriptionQueryTestController.java). However, this controller behaves as expected. When either endpoint is invoked with the same input it produces the exact same output. So my question is, why doesn’t the “initialreResult()” in my work project trigger the query handler as it’s supposed to? I am at a total loss for debugging this.

The exact same query handler code is getting triggered by QueryGateway.query(…), so there’s not a problem with the handler. I’m at a total loss.

Summary

This works:

`

queryGateway.query(new PackageRecordByIdQuery(id), ResponseTypes.instanceOf(PackageRecord.class))

`

This does not:

`

queryGateway.subscriptionQuery(new PackageRecordByIdQuery(id), ResponseTypes.instanceOf(PackageRecord.class), ResponseTypes.instanceOf(PackageRecord.class)).initialResult().block()

`

OK, so I’ve been able to isolate this issue. It turns out that the subscription queries don’t seem to be honoring my message interceptor configuration. My project has the following configuration:

`

@Configuration
public class AxonMessageInterceptorsConfig {

@Autowired
public void registerInterceptors(CommandBus commandBus, QueryBus queryBus) {
Assert.notNull(commandBus, “Invalid configuration, commandBus is null!”);
Assert.notNull(queryBus, “Invalid configuration, queryBus is null!”);

if (AxonServerCommandBus.class.isAssignableFrom(commandBus.getClass())) {
AxonServerCommandBus.class.cast(commandBus).registerDispatchInterceptor(authorizationDispatchInterceptor());
AxonServerCommandBus.class.cast(commandBus).registerHandlerInterceptor(authorizationHandlerInterceptor());
}
if (AxonServerQueryBus.class.isAssignableFrom(queryBus.getClass())) {
AxonServerQueryBus.class.cast(queryBus).registerDispatchInterceptor(authorizationDispatchInterceptor());
AxonServerQueryBus.class.cast(queryBus).registerHandlerInterceptor(authorizationHandlerInterceptor());
}
}

private MessageDispatchInterceptor<? super Message<?>> authorizationDispatchInterceptor() {
return list -> {
AuthToken auth = (AuthToken) SecurityContextHolder.getContext().getAuthentication();

if (auth != null) {
UserInfo userInfo = auth.getPrincipal();
userInfo.validate();
return (index, message) -> message.andMetaData(Collections.singletonMap(USER_INFO, userInfo));
}

return (index, message) -> message;
};
}

private MessageHandlerInterceptor<? super Message<?>> authorizationHandlerInterceptor() {
return (unitOfWork, interceptorChain) -> {
UserInfo userInfo = (UserInfo) unitOfWork.getMessage().getMetaData().get(USER_INFO);
if (userInfo == null) {
throw new SecurityException(“User information not available!”);
}
return interceptorChain.proceed();
};
}
}

`

So, as you can see from this configuration, I am adding a UserInfo into the metadata in the “dispatch interceptor”. Then, in the “handler interceptor” I look for this object and when it’s not present I throw a security exception.

In my debugging, when I execute a subscription query, the dispatch handler is not triggered. However, the handler interceptor is triggered. This means that the handler will produce a security exception. The problem is compounded then by the fact that the security exception is swallowed (seems like a log would be very helpful here because the debugging required to get into the execution stack to find this exception was very unintuitive).

So am I missing something about the configuration of the dispatch handler for subscription queries?

Thanks,

Troy

I’ve made a simple demonstration: https://github.com/troyhart/axon-subscription-query-dispatch-intercepter-bug

This application includes a README that will talk about usage. To emphasize, this issue is centered around the dispatch and handler intercepters, which are implemented in: /axon-subscription-query-dispatch-intercepter-bug/src/main/java/com/example/demo/AxonMessageInterceptersConfig.java

Hi Troy,

thanks for the elaborate report. We know where to look and will find a fix for this as soon as possible.

Cheers,

Hi Troy,

Firstly, sorry that this took so long.
The issue wasn’t major, but it was quite busy at the office due to releasing 4.1.

Regardless though, I did create an issue for this, which you can find here.
Additionally, the pull request to resolve it is also out there and can be found here.

This PR resolves the problem for Axon Framework release 4.1.1 however, so not on a 4.0.x release.
The migration from 4.0.x to 4.1.x shouldn’t impose any issue, but if cant update just yet, I have the following work around to resolve your issue at hand.

You can also register MessageDispatchInterceptors on the QueryGateway.
As the QueryGateway sits in front of any configured QueryBus, it will thus also form the gateway towards the AxonServerQueryBus.
I can guarantee the DefaultQueryGateway does have the correct implementation to utilize the registered MessageDispatchInterceptors prior to dispatching a subscription query.

Simpler though, would be to update to release 4.1.1 of course.
We are planning to release 4.1.1 today by the way.

Hope this helps!

Cheers,
Steven

Thanks!

I have upgraded to 4.1 already, so it should be very simple to go to 4.1.1. I’ll check it out tonight and let you know if I have any questions.

Thanks again!

Troy