Reactive Query Handler

Hello!

I was wondering, is there any possibility to create reactive query handler to be used in combination with reactive data repository?

What I’m trying to do:

I have Reactive Mongo Repository:

    public interface ProductRepository extends ReactiveCrudRepository<Product, String> {
    }

And I’m using ReactorQueryGateway to query the data. The ReactorQueryGateway wraps the response into Mono.

Currently I’m having following code:

    @QueryHandler
    public ProductLookupResponse getAllProducts(FindAllProductsQuery query) {
        List<Product> products = productRepository.findAll()
            .collectList()
            .block();

        return getResponse(products);
    }

    private ProductLookupResponse getResponse(List<Product> products) {
        if(products == null || products.isEmpty()) {
            return new ProductLookupResponse(Collections.emptyList());
        }

        List<ProductDTO> dtos = products.stream()
            .map(this::convertProductToDTO)
            .collect(Collectors.toList());

        return new ProductLookupResponse(dtos);
    }

This code returns ProductLookupResponse, which is wrapped into Mono<ProductLookupResponse> by ReactorQueryGateway.

Is there any possibility to execute the query non-blocking way?
Something like:


    @QueryHandler
    public Mono<ProductLookupResponse> getAllProductsFlux(FindAllProductsQuery query) {
        return productRepository.findAll()
            .collectList()
            .map(this::getResponse);
    }

If yes I’m unfortunately not sure how to implement the query call with the ReactiveQueryGateway.

Thank you for your time!

Hi Jaroslav,

We actually have a plan to support this for the next release of Reactive extension. Right now its not officially supported.

Until then,

What you could do is convert Mono to CompletableFuture and return CompletableFuture (by calling toFuture()). That should work in the case of Mono.

In the case of Flux, you could make it work by combining it with subsribptionQueries. Subscribe to flux from repository on initial result query, and emit elements inside of the flux using queryUpdateEmitter. On the receiving side, you should use reactorGateway.subcriptionQueryMany(…) or updates().

Hi Jaroslav,

actually I guess this is not really such a big deal as it seems … (I am not saying, that the Reactive extension makes no sense)
In most cases or you know that it is a long query …in that case subscribtionQuery is the right way to go. Or you expect the query is relatively quick and then you can live with the .block() call. No?

Dear Stefan,

Do you have any expectations when the next release of Reactive extension will be available?

Thank you!

Today, it is a small release and we didn’t manage to include this feature.
It’s still in discussion. Will keep you posted here!

Hello, I am interested in the topic.

I am implementing Reactor in Controller and Repository and it can’t get it done correctly.

Is there any news of this?

Hi Federico, what exactly are you trying to implement? Query handlers that are using reactive repository?

That’s how it is.

Example:

    @GetMapping("/{identifier}")
    public Mono<Demo> find(@PathVariable("identifier") String identifier) {
        return queryGateway.query(new FindOneDemoQuery(identifier), ResponseTypes.instanceOf(Demo.class));
    }

and

    @QueryHandler
    public Demo handler(FindOneDemoQuery query) {
        return Mono.just(query.getId())
                .flatMap(repository::findById)
                .map(demoToModelConverter::convert)
                .block();
    }

I could return a Mono.
I made approach re writing the class OptionalResponseType and It works for when it is a Mono results.

But I think you should implement a ReactorQueryGateway that supports the return of a Mono or Flux from the Query result.

Hi @federicos, I believe this issue on our extension-reactor is what you are looking for.
As you can see there, this is not yet supported but planned.

Also, note that the extension is an open source project and anyone can contribute to that! :slight_smile:

Update: This feature will be part of Axon Framework core, while reactive extension will add support for reactive interceptors

1 Like

In my case returning Mono in QueryHandler works ok with one pitful. I cannot catch/intercept exceptions since .proceed() method does not throw any exception, it is “hidden” in Mono itself.

Should I really implement it this way?


 @QueryHandler
    fun findById(findByEntityId: FindByEntityId): Mono<DomainEntityAggregate> =
entityRepository.findById(findByEntityId.entityId)
            .switchIfEmpty {
                throw EntityNotFoundException("Entity with id: ${findByEntityId.entityId} has not been found") }
            .doOnError { throw QueryExecutionException("Entity not found!", it, it) }
            .map(::mapToEntityAggregate)

Is there any advantage of returning Mono instead of raw object?

Is there any way to catch all exceptions thrown in Mono excecution?

I found some workaround by implementing MessageHandlerInterceptor but I’m not proud of it since it is blocking execution:

if (returnValue is Mono<*>) {
                returnValue.doOnError {
                    throw QueryExecutionException(it.message, it, it)
                }.block()
            }

You shouldn’t do it like this.

@QueryHandler
fun findById(findByEntityId: FindByEntityId): Mono =
entityRepository.findById(findByEntityId.entityId)
.switchIfEmpty {
Mono.error( EntityNotFoundException(“Entity with id: ${findByEntityId.entityId} has not been found”)) }
.map(::mapToEntityAggregate)

did you try it like this?
I think it should be converted to a regular exception by framework. Alternatively you can use Reactor Extension and implement a reactive interceptor, with just “doOnError” operator to handle any errors reactivly

Thanks Stefan for quick response, much appreciated!

I tried it, unfortunatelly it did not work.

I debugged it step by step:

  1. Rest endpoint sends query to reactive query gateway
 suspend fun read(serverRequest: ServerRequest): ServerResponse {
        return reactiveQueryGateway.query(
            FindByEntityId(UUID.fromString(serverRequest.pathVariable(FindByEntityId::entityId.name))),
            DomainEntityAggregate::class.java
        ).mapNotNull { EntityResponse(it.id, it.name) }
            .flatMap { ServerResponse.ok().body(it, UUID::class.java) }
            .awaitSingle()
  1. QueryHandler returns Mono with error inside
@QueryHandler
fun findById(findByEntityId: FindByEntityId): Mono<DomainEntityAggregate> =
        entityRepository.findById(findByEntityId.entityId)
            .switchIfEmpty {
                Mono.error(EntityNotFoundException("Entity with id: ${findByEntityId.entityId} has not been found"))
            }
            .map(::mapToEntityAggregate)
  1. Then GenericResultMessage is generated inside the method: org.axonframework.queryhandling.SimpleQueryBus#doQuery with exception but with EntityNotFoundException, so far so good.

  2. Next org.axonframework.messaging.unitofwork.DefaultUnitOfWork#executeWithResult the exception is passed to GenericResultMessage as EntityNotFoundException (without wrapping with HandlerExecutionException)

  3. Then when exceptionDetails is called we have such implementation:


  public static <R> Optional<R> resolveDetails(Throwable throwable) {
        if (throwable instanceof HandlerExecutionException) {
            return ((HandlerExecutionException) throwable).getDetails();
        } else if (throwable != null && throwable.getCause() != null) {
            return resolveDetails(throwable.getCause());
        }
        return Optional.empty();
    }
  1. EntityNotFoundException does not extend the HandlerExecutionException so method will return Optional.empty() instead of Optional.ofNullable(throwable). Thats why I have the exception message but no more information about the root cause class.

  2. Reactive interceptors are invoked too late after QueryBus responded with Mono containing error. I debugged the code and there is no “hook” which would allow me to react on error/exception carried by Mono.

  3. IMO the only way to get the details available in rest controller is to make my business extend HandlerExecutionException, otherwise the are “swallawed” by Mono and I could not find any way to wrap them globally by HandlerExcecutionException at all.

Some thoughs:

  1. I think that wrapping DefaultUnitOfWork is not best idea
  2. The whole problem is that under the hood DefaultUnitOfWork operates on MonoOnAssembly (Mono) and there is no way of injectin doOnError instruction. @ErrorHandlers are not invoked since MonoOnAssemby containing error does not invoke them. Block method is invoked when converting the results in org.axonframework.messaging.responsetypes.InstanceResponseType#convert
  3. I can resign from QueryHandler annotations and register them manually and then attaching common error handler.

@stefand - I finally did it :slight_smile:

This is query bus message handler interceptor:


class ExceptionInterceptor<T : Message<*>>(private val mapper: Function<in Throwable, out Throwable>) :
    MessageHandlerInterceptor<T> {

    @Throws(Exception::class)
    override fun handle(
        unitOfWork: UnitOfWork<out T>,
        interceptorChain: InterceptorChain
    ): Any {
        var returnValue = interceptorChain.proceed()

        if (returnValue is Mono<*>) {
            returnValue = returnValue.onErrorMap { mapper.apply(it) }
            returnValue.subscribe()
        }
        return returnValue
    }

}

This is how I register it:

commandBus.registerHandlerInterceptor(ExceptionInterceptor<Message<*>> { e ->
   CommandExecutionException(e.message, e, e)})

In summary:

My ExceptionInterceptor takes mapper function to register it in onErrorMap map function. The game changer was to call subscribe() after reassigning returnValue.

In case someone like even more kotlin notation (or even oneliner) here it is:

    @Throws(Exception::class)
    override fun handle(
        unitOfWork: UnitOfWork<out T>,
        interceptorChain: InterceptorChain
    ): Any = interceptorChain.proceed().let {
        return if (it is Mono<*>) it.onErrorMap(mapper::apply).also(Mono<*>::subscribe) else it
    }

Works like a charm! :sunglasses:

An update, istead of subscribing just return Mono:

@Throws(Exception::class)
    override fun handle(
        unitOfWork: UnitOfWork<out T>,
        interceptorChain: InterceptorChain
    ): Any =
        interceptorChain.proceed().let {
            return if (it is Mono<*>) {
                it.onErrorMap(mapper::apply)
            } else if (it is Flux<*>) {
                it.onErrorMap(mapper::apply)
            } else it
        }