Passing along metadata to ReactorQueryGateway

Hi, and thanks for reading my thread,

I am currently struggling to find a resolution for the following problem:

Our project setup uses the RSocket protocol for frontend <> bff communication. The frontend uses ‘rsocket-core’ and uses this to send queries/commands/streamrequests (querysubscription).
These RSocket messages hold 1) metadata and 2) payload data.

In our bff, we receive said requests in a generic controller with “socket.query” / “socket.subscription.query” / “socket.command” as paths (irrelevant I suppose for this issue).
An example of a working setup is:

    @MessageMapping(QUERY_DESTINATION)
    public Mono<SocketResponse<?>> query(
            final Query query) {
    
        return queryGateway.query(
                        query,
                        Object.class
                )
                .map(SocketResponse::ok);
    }

Here, Query is a marker interface. Query/command objects example is as follows:

public record FindTotalsPerDayQuery(
        DateRange appliedRange,
        Set<String> someIds
) implements Query {}

Now, this setup is working – but we recently have the added requirement that the RSocket metadata should be available (for security/… purposes) on the (Query/Command)Message.

So for this, we’ve made some slight modification and this does seem to be working:

    @MessageMapping(QUERY_DESTINATION)
    public Mono<SocketResponse<?>> query(
            final Query query,
            @Headers Map<String, Object> headers) {

        QueryMessage<Query, Object> genericQueryMessage = new GenericQueryMessage<>(query, ResponseTypes.instanceOf(Object.class))
                .andMetaData(rSocketMetaDataMapper.mapToMetaData(headers).toMap());
        return queryGateway.query(
                        genericQueryMessage,
                        Object.class
                )
                .map(SocketResponse::ok);

So what’s added here?

  • Headers input variable – these are the RSocket headers, where we read the RSocket metadata from. Irrelevant for this problem.
  • We wrap the input Query into a GenericQueryMessage
    o We have to work with Object.class here – due to the generic setup that we are stuck with for now.
    o Since it is now a GenericQueryMessage, we can add the metadata on the (Query/Command)Message.

The same setup is done for the command path as well as the subscription path.
This setup does work for queries (and probably commands, but not yet thoroughly (integration) tested).

The problem lies in the subscription path.

We went from:

    @MessageMapping(SUBSCRIPTION_QUERY_DESTINATION)
    public Flux<SocketResponse<?>> subscriptionQuery(
            final Object query,
            @Headers Map<String, Object> headers) {

        return queryGateway.subscriptionQuery(
                        query,
                        ResponseTypes.instanceOf(Object.class)
                )
                .map(SocketResponse::ok);
    }

to:

    @MessageMapping(SUBSCRIPTION_QUERY_DESTINATION)
    public Flux<SocketResponse<?>> subscriptionQuery(
            final Object query,
            @Headers Map<String, Object> headers) {

        SubscriptionQueryMessage< Object, List< Object >, Object > subGenericQueryMessage = new GenericSubscriptionQueryMessage<>(
                query,
                ResponseTypes.multipleInstancesOf(Object.class),
                ResponseTypes.instanceOf(Object.class)
        )
                .andMetaData(rSocketMetaDataMapper.mapToMetaData(headers).toMap());

        return queryGateway.subscriptionQuery(
                        subGenericQueryMessage,
                        ResponseTypes.instanceOf(Object.class)
                )
                .map(SocketResponse::ok);
    }

Things to note:

Whenever a subscription query is fired, I get a CANCELLED: AXONIQ-5000 error. By research, I see this means no handler can be found.
However when the “exact” same payload (query) is used in a query instead of a subscription, a handler is resolved.

On the axon server I get the following error:

Seeing the fact that the only change is the wrapper around the payload, I am quite confident something is wrong with the way I build the GenericSubscriptionQueryMessage.
However I fail to find a resolution or a path forward.

I am also open to alternatives on how to pass metadata along, but the resources I found indicate this as a way of working.

Thank you for your time,
Rhino

A potentially valueable debug screenshot:

Hi @Rhino, and welcome to the forum!

As a matter of fact, I have dealt with a similar issue for the DefaultQueryGateway recently.
In that case, it was about the QueryGateway#streamingQuery operation.
Similarly, providing a Message implementation to attach MetaData didn’t work and threw a NoHandlerForQueryException.

There, the predicament occurred due to a bug in Axon Framework.
It didn’t validate whether the given payload was an instance of Message.

I checked the DefaultReactorQueryGateway#subscriptionQuery operation and can state that it has the same predicament as the QueryGateway#streamingQuery.
Differently put, you have found a bug in Axon’s Reactor Extension.
I went ahead and provided a PR to fix it, which you can find here.
The solution will be part of 4.8.1. of the Reactor Extension by the way, which we’ll likely release next week.

To unblock you for now, @Rhino, your best way forward is to set a MessageDispatchInterceptor to attach the MetaData, instead of providing a Message implementation to the ReactorQueryGateway.

Concluding, I hope this helps you further, @Rhino!

Hi @Steven_van_Beelen

Thank you for digging through the issue at hand!

The offered alternative, based on interceptors, is something we initially planned to work with. But given the described option above popped up, we decided this is a much cleaner solution.

That said, it is good to have a plan moving forward. We’re looking forward to upgrade to 4.8.1 in that case :slight_smile:

Thanks!

1 Like