We actually have a plan to support this for the next release of Reactive extension. Right now its not officially supported.
Until then,
What you could do is convert Mono to CompletableFuture and return CompletableFuture (by calling toFuture()). That should work in the case of Mono.
In the case of Flux, you could make it work by combining it with subsribptionQueries. Subscribe to flux from repository on initial result query, and emit elements inside of the flux using queryUpdateEmitter. On the receiving side, you should use reactorGateway.subcriptionQueryMany(…) or updates().
actually I guess this is not really such a big deal as it seems … (I am not saying, that the Reactive extension makes no sense)
In most cases or you know that it is a long query …in that case subscribtionQuery is the right way to go. Or you expect the query is relatively quick and then you can live with the .block() call. No?
In my case returning Mono in QueryHandler works ok with one pitful. I cannot catch/intercept exceptions since .proceed() method does not throw any exception, it is “hidden” in Mono itself.
Should I really implement it this way?
@QueryHandler
fun findById(findByEntityId: FindByEntityId): Mono<DomainEntityAggregate> =
entityRepository.findById(findByEntityId.entityId)
.switchIfEmpty {
throw EntityNotFoundException("Entity with id: ${findByEntityId.entityId} has not been found") }
.doOnError { throw QueryExecutionException("Entity not found!", it, it) }
.map(::mapToEntityAggregate)
Is there any advantage of returning Mono instead of raw object?
Is there any way to catch all exceptions thrown in Mono excecution?
I found some workaround by implementing MessageHandlerInterceptor but I’m not proud of it since it is blocking execution:
if (returnValue is Mono<*>) {
returnValue.doOnError {
throw QueryExecutionException(it.message, it, it)
}.block()
}
@QueryHandler
fun findById(findByEntityId: FindByEntityId): Mono =
entityRepository.findById(findByEntityId.entityId)
.switchIfEmpty {
Mono.error( EntityNotFoundException(“Entity with id: ${findByEntityId.entityId} has not been found”)) }
.map(::mapToEntityAggregate)
did you try it like this?
I think it should be converted to a regular exception by framework. Alternatively you can use Reactor Extension and implement a reactive interceptor, with just “doOnError” operator to handle any errors reactivly
@QueryHandler
fun findById(findByEntityId: FindByEntityId): Mono<DomainEntityAggregate> =
entityRepository.findById(findByEntityId.entityId)
.switchIfEmpty {
Mono.error(EntityNotFoundException("Entity with id: ${findByEntityId.entityId} has not been found"))
}
.map(::mapToEntityAggregate)
Then GenericResultMessage is generated inside the method: org.axonframework.queryhandling.SimpleQueryBus#doQuery with exception but with EntityNotFoundException, so far so good.
Next org.axonframework.messaging.unitofwork.DefaultUnitOfWork#executeWithResult the exception is passed to GenericResultMessage as EntityNotFoundException (without wrapping with HandlerExecutionException)
Then when exceptionDetails is called we have such implementation:
EntityNotFoundException does not extend the HandlerExecutionException so method will return Optional.empty() instead of Optional.ofNullable(throwable). Thats why I have the exception message but no more information about the root cause class.
Reactive interceptors are invoked too late after QueryBus responded with Mono containing error. I debugged the code and there is no “hook” which would allow me to react on error/exception carried by Mono.
IMO the only way to get the details available in rest controller is to make my business extend HandlerExecutionException, otherwise the are “swallawed” by Mono and I could not find any way to wrap them globally by HandlerExcecutionException at all.
Some thoughs:
I think that wrapping DefaultUnitOfWork is not best idea
The whole problem is that under the hood DefaultUnitOfWork operates on MonoOnAssembly (Mono) and there is no way of injectin doOnError instruction. @ErrorHandlers are not invoked since MonoOnAssemby containing error does not invoke them. Block method is invoked when converting the results in org.axonframework.messaging.responsetypes.InstanceResponseType#convert
I can resign from QueryHandler annotations and register them manually and then attaching common error handler.
class ExceptionInterceptor<T : Message<*>>(private val mapper: Function<in Throwable, out Throwable>) :
MessageHandlerInterceptor<T> {
@Throws(Exception::class)
override fun handle(
unitOfWork: UnitOfWork<out T>,
interceptorChain: InterceptorChain
): Any {
var returnValue = interceptorChain.proceed()
if (returnValue is Mono<*>) {
returnValue = returnValue.onErrorMap { mapper.apply(it) }
returnValue.subscribe()
}
return returnValue
}
}
This is how I register it:
commandBus.registerHandlerInterceptor(ExceptionInterceptor<Message<*>> { e ->
CommandExecutionException(e.message, e, e)})
In summary:
My ExceptionInterceptor takes mapper function to register it in onErrorMap map function. The game changer was to call subscribe() after reassigning returnValue.
In case someone like even more kotlin notation (or even oneliner) here it is:
@Throws(Exception::class)
override fun handle(
unitOfWork: UnitOfWork<out T>,
interceptorChain: InterceptorChain
): Any = interceptorChain.proceed().let {
return if (it is Mono<*>) it.onErrorMap(mapper::apply).also(Mono<*>::subscribe) else it
}
An update, istead of subscribing just return Mono:
@Throws(Exception::class)
override fun handle(
unitOfWork: UnitOfWork<out T>,
interceptorChain: InterceptorChain
): Any =
interceptorChain.proceed().let {
return if (it is Mono<*>) {
it.onErrorMap(mapper::apply)
} else if (it is Flux<*>) {
it.onErrorMap(mapper::apply)
} else it
}