Hello Ivan,
I have a swagger based UI.
RestController
@ApiOperation(value = "Watch the status of an order for a customer")
@GetMapping(value = "/{customerName}/watch", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<OrderHistory> watch(@PathVariable("customerName") String customerName) {
logger.info("Request received for customer "+ customerName);
SubscriptionQueryResult<List<OrderHistory>, OrderHistory> response = queryGateway.subscriptionQuery("orderHistory", customerName,
ResponseTypes.multipleInstancesOf(OrderHistory.class),
ResponseTypes.instanceOf(OrderHistory.class),
SubscriptionQueryBackpressure.defaultBackpressure()
);
return response.initialResult().flatMapMany(Flux::fromIterable).concatWith(response.updates());
}
@Component
public class OrderHistoryProjection {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OrderHistoryRepository orderHistoryRepository;
private final QueryUpdateEmitter updateEmitter;
public OrderHistoryProjection(OrderHistoryRepository orderHistoryRepository, QueryUpdateEmitter updateEmitter) {
this.orderHistoryRepository = orderHistoryRepository;
this.updateEmitter = updateEmitter;
}
@EventHandler
public void handle(OrderCreatedEvent event) {
logger.info("OrderHistoryProjection OrderCreatedEvent ");
OrderHistory newEntry =new OrderHistory(event.getOrderId().toString(), Instant.now(),
"Order for (" + event.getDeviceName() + ") placed by " + "BBM", "BBM");
orderHistoryRepository.save(newEntry);
updateEmitter.emit(m -> "orderHistory".equals(m.getQueryName()),
// && newEntry.getCustomerName().equals(m.getPayload()),
newEntry);
}
@QueryHandler(queryName = "orderHistory")
public List<OrderHistory> findMovements(String customerName) {
List<OrderHistory> orderHistory = new ArrayList<OrderHistory>();
orderHistoryRepository.findAll().iterator().forEachRemaining(orderHistory::add);
return orderHistory;
}
A very simple use case…
