subscription query - webflux

I am trying out the new subscription query feature.

I have a projection that stores account data in an in memory hashset and a query handler that uses a key to get the value from that map.

In my Webflux restController I do

@GetMapping(value = "/account/{accountId}",  produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<GetBalanceResult> streamResult(@PathVariable("accountId") String accountId) {
  return queryGateway.subscriptionQuery(new GetBalance(accountId), GetBalanceResult.class, GetBalanceResult.class).updates();
}


But when I subscribe to the flux via "http -S  :8080/api/account/1" I do not get a result.

What am I missing?

Hi Jan,

I ran a small test, and a Webflux rest controller like you have works well in my case.

My guess would therefore be that the problem is in the emitting side. Are you using the QueryUpdateEmitter? If not - that’s the thing missing. If so, could you share that code as well?

Kind regards,

Thanks, Frans, I forgot the update emitter. Began to wonder already where the magic should come from.
I guess it was just to late.
Very impressive feature!

I still have the same issue and i have used the queryupdate emitter, nothing is seen in the UI.

Do you have the same setup as Jan described?
It is important that you produce MediaType.APPLICATION_STREAM_JSON_VALUE on the (rest) controller level. In this case, you have a stream to the UI. You can also use Server-Sent Events to stream to the UI. I personally prefer to stream the data back to the UI in async and preferably in a reactive way with WebSockets or RSockets.

Hello Ivan,
I have a swagger based UI.

RestController

@ApiOperation(value = "Watch the status of an order for a customer")
@GetMapping(value = "/{customerName}/watch", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<OrderHistory> watch(@PathVariable("customerName") String customerName) {
   logger.info("Request received for customer "+ customerName);
   SubscriptionQueryResult<List<OrderHistory>, OrderHistory> response = queryGateway.subscriptionQuery("orderHistory", customerName,
         ResponseTypes.multipleInstancesOf(OrderHistory.class),
         ResponseTypes.instanceOf(OrderHistory.class),
         SubscriptionQueryBackpressure.defaultBackpressure()
   );

   return response.initialResult().flatMapMany(Flux::fromIterable).concatWith(response.updates());

}
@Component
public class OrderHistoryProjection {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final OrderHistoryRepository orderHistoryRepository;

    private final QueryUpdateEmitter updateEmitter;

    public OrderHistoryProjection(OrderHistoryRepository orderHistoryRepository, QueryUpdateEmitter updateEmitter) {
        this.orderHistoryRepository = orderHistoryRepository;
        this.updateEmitter = updateEmitter;
    }

    @EventHandler
    public void handle(OrderCreatedEvent event) {
        logger.info("OrderHistoryProjection OrderCreatedEvent ");
        OrderHistory newEntry =new OrderHistory(event.getOrderId().toString(), Instant.now(),
                "Order for  (" + event.getDeviceName() + ") placed by " + "BBM", "BBM");
                orderHistoryRepository.save(newEntry);
        updateEmitter.emit(m -> "orderHistory".equals(m.getQueryName()),
                // && newEntry.getCustomerName().equals(m.getPayload()),
                newEntry);

    }

@QueryHandler(queryName = "orderHistory")
public List<OrderHistory> findMovements(String customerName) {
    List<OrderHistory> orderHistory = new ArrayList<OrderHistory>();
            orderHistoryRepository.findAll().iterator().forEachRemaining(orderHistory::add);
    return orderHistory;
}

A very simple use case…