ReactorCommand/QueryGateway and access to RequestHeaders

Greetings,
I’d like to pass values from request headers to Metadata so I can use these values in other microservices, regardless if the request is active or not (Since I’m using SpringWebflux, I use ReactorCommandGateway for command dispatching ). I’m thinking of implementing ReactorMessageDispatchInterceptor but I don’t know how to access headers inside the aforementioned interceptor.Is there a preferred way on how to achieve this?

I would advice to propagate the request header you want to use in the interceptor to the meta data, when the request comes in, so likely in the REST endpoint, and propagate the header if needed to other messages. Would that work?

Example from documentation (reactive-gateways) has this snippet:


    public void registerDispatchInterceptor(ReactorEventGateway reactiveGateway) {
        reactiveGateway.registerDispatchInterceptor(
            msgMono -> msgMono.filterWhen(v -> Mono.subscriberContext()
                              .filter(ctx-> ctx.hasKey("security"))
                              .map(ctx->ctx.get("security")))
        );
    }
}

I implemented WebFilter and added new Context via contextWrite method like this:

return chain.filter(exchange).contextWrite(ctx -> {
                    Map<String,String> appHeaderData = new HashMap<>();
           appHeaderData.put(BaseConstants.REQUEST_HEADER_NAME, requestId);
            if (exchange.getRequest().getHeaders().containsKey(BaseConstants.COMPANY_HEADER_NAME))
                appHeaderData.put(BaseConstants.COMPANY_HEADER_NAME, exchange.getRequest().getHeaders().getFirst(BaseConstants.COMPANY_HEADER_NAME));
            if (exchange.getRequest().getHeaders().containsKey(BaseConstants.PRODUCT_HEADER_NAME))
                appHeaderData.put(BaseConstants.PRODUCT_HEADER_NAME, exchange.getRequest().getHeaders().getFirst(BaseConstants.PRODUCT_HEADER_NAME));
            if (exchange.getRequest().getHeaders().containsKey(HttpHeaders.ACCEPT_LANGUAGE))
                appHeaderData.put(HttpHeaders.ACCEPT_LANGUAGE, exchange.getRequest().getHeaders().getFirst(HttpHeaders.ACCEPT_LANGUAGE));
            return ctx.put(BaseConstants.APP_DATA_CONTEXT_KEY,appHeaderData);
        }); 

…but inside my ReactorMessageDispatchInterceptor I can’t access the context value added in filter (also the example in documentation uses subscriberContext method which, I think, is deprecated and removed in reactor-core 3.5.0)

@Slf4j
public class GlobalCommandDispatchInterceptor implements ReactorMessageDispatchInterceptor<CommandMessage<?>> {


    public GlobalCommandDispatchInterceptor() {

    }

    @Override
    public Mono<CommandMessage<?>> intercept(Mono<CommandMessage<?>> message) {
        return Mono.deferContextual(ctx -> message.map(m -> m.andMetaData(addMetadata(ctx))));
        
    }

    private Map<String, ?> addMetadata(ContextView ctx) {
        Map<String, String> metadata = new HashMap<>();

        if (!ctx.hasKey(BaseConstants.APP_DATA_CONTEXT_KEY))
            throw new RaoSystemException(BaseConstants.APP_DATA_CONTEXT_KEY + " is missing in reactive context");

        Map<String, String> contextData = ctx.get(BaseConstants.APP_DATA_CONTEXT_KEY);
        metadata.put(HttpHeaders.ACCEPT_LANGUAGE, contextData.get(HttpHeaders.ACCEPT_LANGUAGE));
        if (contextData.containsKey(BaseConstants.PRODUCT_HEADER_NAME))
            metadata.put(BaseConstants.PRODUCT_HEADER_NAME, contextData.get(BaseConstants.PRODUCT_HEADER_NAME));
        if (contextData.containsKey(BaseConstants.COMPANY_HEADER_NAME))
            metadata.put(BaseConstants.COMPANY_HEADER_NAME, contextData.get(BaseConstants.COMPANY_HEADER_NAME));
        if (contextData.containsKey(BaseConstants.REQUEST_HEADER_NAME))
            metadata.put(BaseConstants.REQUEST_HEADER_NAME, contextData.get(BaseConstants.REQUEST_HEADER_NAME));

        return metadata;

    }
}

Am I doing something wrong?

EDIT:

Filters are set on Gateway service level so the context is not propagated (with the exception of tracing context) from one service to another, so the filter must be set on service which contains REST API.

I think you still need to adjust the first part to contextwrite, so to something like:

    public void registerDispatchInterceptor(ReactorEventGateway reactiveGateway) {
        reactiveGateway.registerDispatchInterceptor(
            msgMono -> msgMono.filterWhen(v -> Mono.contextWrite(c -> context))
                              .filter(ctx-> ctx.hasKey("security"))
                              .map(ctx->ctx.get("security")))
        );
    }