Reactive Query Handler

Hello!

I was wondering, is there any possibility to create reactive query handler to be used in combination with reactive data repository?

What I’m trying to do:

I have Reactive Mongo Repository:

    public interface ProductRepository extends ReactiveCrudRepository<Product, String> {
    }

And I’m using ReactorQueryGateway to query the data. The ReactorQueryGateway wraps the response into Mono.

Currently I’m having following code:

    @QueryHandler
    public ProductLookupResponse getAllProducts(FindAllProductsQuery query) {
        List<Product> products = productRepository.findAll()
            .collectList()
            .block();

        return getResponse(products);
    }

    private ProductLookupResponse getResponse(List<Product> products) {
        if(products == null || products.isEmpty()) {
            return new ProductLookupResponse(Collections.emptyList());
        }

        List<ProductDTO> dtos = products.stream()
            .map(this::convertProductToDTO)
            .collect(Collectors.toList());

        return new ProductLookupResponse(dtos);
    }

This code returns ProductLookupResponse, which is wrapped into Mono<ProductLookupResponse> by ReactorQueryGateway.

Is there any possibility to execute the query non-blocking way?
Something like:


    @QueryHandler
    public Mono<ProductLookupResponse> getAllProductsFlux(FindAllProductsQuery query) {
        return productRepository.findAll()
            .collectList()
            .map(this::getResponse);
    }

If yes I’m unfortunately not sure how to implement the query call with the ReactiveQueryGateway.

Thank you for your time!

Hi Jaroslav,

We actually have a plan to support this for the next release of Reactive extension. Right now its not officially supported.

Until then,

What you could do is convert Mono to CompletableFuture and return CompletableFuture (by calling toFuture()). That should work in the case of Mono.

In the case of Flux, you could make it work by combining it with subsribptionQueries. Subscribe to flux from repository on initial result query, and emit elements inside of the flux using queryUpdateEmitter. On the receiving side, you should use reactorGateway.subcriptionQueryMany(…) or updates().

Hi Jaroslav,

actually I guess this is not really such a big deal as it seems … (I am not saying, that the Reactive extension makes no sense)
In most cases or you know that it is a long query …in that case subscribtionQuery is the right way to go. Or you expect the query is relatively quick and then you can live with the .block() call. No?

Dear Stefan,

Do you have any expectations when the next release of Reactive extension will be available?

Thank you!

Today, it is a small release and we didn’t manage to include this feature.
It’s still in discussion. Will keep you posted here!