Reactive Query Handler

Hello!

I was wondering, is there any possibility to create reactive query handler to be used in combination with reactive data repository?

What I’m trying to do:

I have Reactive Mongo Repository:

    public interface ProductRepository extends ReactiveCrudRepository<Product, String> {
    }

And I’m using ReactorQueryGateway to query the data. The ReactorQueryGateway wraps the response into Mono.

Currently I’m having following code:

    @QueryHandler
    public ProductLookupResponse getAllProducts(FindAllProductsQuery query) {
        List<Product> products = productRepository.findAll()
            .collectList()
            .block();

        return getResponse(products);
    }

    private ProductLookupResponse getResponse(List<Product> products) {
        if(products == null || products.isEmpty()) {
            return new ProductLookupResponse(Collections.emptyList());
        }

        List<ProductDTO> dtos = products.stream()
            .map(this::convertProductToDTO)
            .collect(Collectors.toList());

        return new ProductLookupResponse(dtos);
    }

This code returns ProductLookupResponse, which is wrapped into Mono<ProductLookupResponse> by ReactorQueryGateway.

Is there any possibility to execute the query non-blocking way?
Something like:


    @QueryHandler
    public Mono<ProductLookupResponse> getAllProductsFlux(FindAllProductsQuery query) {
        return productRepository.findAll()
            .collectList()
            .map(this::getResponse);
    }

If yes I’m unfortunately not sure how to implement the query call with the ReactiveQueryGateway.

Thank you for your time!

Hi Jaroslav,

We actually have a plan to support this for the next release of Reactive extension. Right now its not officially supported.

Until then,

What you could do is convert Mono to CompletableFuture and return CompletableFuture (by calling toFuture()). That should work in the case of Mono.

In the case of Flux, you could make it work by combining it with subsribptionQueries. Subscribe to flux from repository on initial result query, and emit elements inside of the flux using queryUpdateEmitter. On the receiving side, you should use reactorGateway.subcriptionQueryMany(…) or updates().

Hi Jaroslav,

actually I guess this is not really such a big deal as it seems … (I am not saying, that the Reactive extension makes no sense)
In most cases or you know that it is a long query …in that case subscribtionQuery is the right way to go. Or you expect the query is relatively quick and then you can live with the .block() call. No?

Dear Stefan,

Do you have any expectations when the next release of Reactive extension will be available?

Thank you!

Today, it is a small release and we didn’t manage to include this feature.
It’s still in discussion. Will keep you posted here!

Hello, I am interested in the topic.

I am implementing Reactor in Controller and Repository and it can’t get it done correctly.

Is there any news of this?

Hi Federico, what exactly are you trying to implement? Query handlers that are using reactive repository?

That’s how it is.

Example:

    @GetMapping("/{identifier}")
    public Mono<Demo> find(@PathVariable("identifier") String identifier) {
        return queryGateway.query(new FindOneDemoQuery(identifier), ResponseTypes.instanceOf(Demo.class));
    }

and

    @QueryHandler
    public Demo handler(FindOneDemoQuery query) {
        return Mono.just(query.getId())
                .flatMap(repository::findById)
                .map(demoToModelConverter::convert)
                .block();
    }

I could return a Mono.
I made approach re writing the class OptionalResponseType and It works for when it is a Mono results.

But I think you should implement a ReactorQueryGateway that supports the return of a Mono or Flux from the Query result.

Hi @federicos, I believe this issue on our extension-reactor is what you are looking for.
As you can see there, this is not yet supported but planned.

Also, note that the extension is an open source project and anyone can contribute to that! :slight_smile: