Subscription query never receives updates, always times out

Hi,

We’re trying to use a subscription query in our RestController to “fake” a sync call.
In our projector (writes to a JPA repository) we emit the the query update. Our logging and some debugging also suggests that the update is send as expected.
However our subscriptionQuery never seems to receive the update.

Most of our logic is based on this example: https://github.com/fransvanbuul/axon-sync-rest-frontend

However in our rest controller we first execute a query to get the current object (we need it in the command). So we try to do the following:

   final GetPEConceptById getPEConceptById = new GetPEConceptById(ExternalId.fromString(id));
    SubscriptionQueryResult<PEConcept, PEConcept> queryResult = queryGateway.subscriptionQuery(
            getPEConceptById,
            ResponseTypes.instanceOf(PEConcept.class),
            ResponseTypes.instanceOf(PEConcept.class));
    /* Sending the command to update the concept. */
    try{
        log.info("getting current current concept and sending command");
        queryGateway.query(getPEConceptById, ResponseTypes.instanceOf(PEConcept.class)).thenAccept(
                c -> sendReplaceCommand(peConceptResponse, c));
        
        return queryResult.updates().blockFirst(Duration.ofSeconds(10));
        

    } finally {
        log.info("Closing subscription query");
        queryResult.close();
    }

The query return the proper concept and the command is send.

We have 2 eventhandlers that process the event generated by the command. One updates the projection (and emits the update via the queryUpdateEmitter) and one sends the some data to an external system.

I don’t really get why this should not work as expected.
Is there any way we can debug this issue further?

Any insight would be great!

Thanks,

Danny

There are two issues that need to be addressed in this case:

  1. We need to subscribe to updates before we send a command, that’s the only way to be sure we will not miss any updates. Sending commands first and then subscribing for updates will result in race conditions!

There is a simple trick is to subscribe to the initial result first (even that we don’t need it). Let’s call it virtual initial result. This will open the Subscription query, which will buffer all updates that arrive at this point on.
Since we now have a buffer for updates, we can send a command and after the command has sent we can subscribe to updates flux. If an update arrives after sending a command and before we are subscribed for updates, we will read it automatically from the buffer, therefore we are sure we will not miss any updates.

  1. We need to read our own writes, multiple updates/events could be dispatch at the same time, we can’t guarantee order and which one will arrive first. Without some kind of correlation, we will easily get into trouble and get someone else’s update. The Safest way to go is to introduce a unique id for each command. We can attach this data to event meta-data, and once projection is materialized we can track which command is responsible for this update. Luckily Axon Framework offers this functionality out of the box. Every event contains a tracking id which is the id of the command that created that event. We will use this mechanism to force consistency.

     @RestController
     public class CommandController {
    
         private final CommandGateway commandGateway;
         private final QueryGateway queryGateway;
    
         public CommandController(CommandGateway commandGateway, QueryGateway queryGateway) {
             this.commandGateway = commandGateway;
             this.queryGateway = queryGateway;
         }
    
         @PostMapping("/entities/{id}")
         public Mono<String> myApi(@PathVariable("id") String entityId) {
             CommandMessage<Object> command = GenericCommandMessage.asCommandMessage(new CreateMyEntityCommand(entityId));
    
             GetMyEntityByCorrelationIdQuery query = new GetMyEntityByCorrelationIdQuery(command.getIdentifier());
             SubscriptionQueryResult<Void, MyEntity> response = queryGateway.subscriptionQuery(query,
                                                                                               Void.class,
                                                                                               MyEntity.class);
             return sendAndReturnUpdate(command, response)
                     .map(MyEntity::getId);
         }
    
         public <U> Mono<U> sendAndReturnUpdate(Object command, SubscriptionQueryResult<?, U> result) {
             return Mono.when(result.initialResult())
                     .then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
                     .thenMany(result.updates())
                     .timeout(Duration.ofSeconds(5))
                     .next()
                     .doFinally(unused -> result.cancel());
         }
     }
    
     @Component
     class MyEntityProjection {
    
         private final QueryUpdateEmitter emitter;
    
         public MyEntityProjection(QueryUpdateEmitter emitter) {
             this.emitter = emitter;
         }
    
         @QueryHandler
         public Optional<Void> on(GetMyEntityByCorrelationIdQuery query) {
             return Optional.empty();
         }
    
         @EventHandler
         public void on(MyEntityCreatedEvent event, @MetaDataValue("correlationId") String correlationId) {
             MyEntity entity = new MyEntity(event.getEntityId());
             // save your entity in your repository...
             
             emitter.emit(GetMyEntityByCorrelationIdQuery.class,
                          query -> query.getCorrelationId().equals(correlationId),
                          entity);
         }
     }
    
     @Aggregate
     public class MyEntityAggregate {
    
         @AggregateIdentifier
         private String entityId;
    
         public MyEntityAggregate() {
         }
    
         @CommandHandler
         public MyEntityAggregate(CreateMyEntityCommand command) {
             apply(new MyEntityCreatedEvent(command.getEntityId()));
         }
    
         @EventSourcingHandler
         public void on(MyEntityCreatedEvent event) {
             entityId = event.getEntityId();
         }
     }
    

Hope this helps,
Stefan

Tried to make this work, but I couldn’t make it so. Also our rest controllers return CompleteableFutures.

What I still don’t get is how to check if the rest controller is really subscribing to the updates. I feel that is not really happening.

Danny

Hi Danny,

please double check the return type of your query handlers and what you are emitting using the queryUpdateEmitter. A common problem is, that the type of your update is not matching what you are sending in the initial result and then it is not delivered to the subscription.

And as Stefan said, if you are not subscribed to the initial() you won’t get any updates() - make sure you have something like concatination of those two subscriptions and you are SUBSCRIBED to it, like queryResult.initial().concatWith(queryResult.initial())....subscribe( result -> myCompletableFuture.complete(result) )

Cheers,

Simon

Whatever I do, nothing seems to work. I always get the initial result, but never the updates.
How can I check or debug if I am really subscribed to the updates?
I’ve already had breakpoints on the queryEmitter.emit() and on the results.initialResult() and result.updates() but do not know what I should be looking for.
Most of our application uses CompletableFutures, All of it is running in a single jvm.

We do have some dispatch and message handlers on the commandgateway, querygateway and querybus. We also added a CorrelationDataProvider next to the standard MessageOriginProvider. Could these things cause issues?

Danny

Please post the code of your Query handler and your subscription here…

This is our query handler. The findPEConceptPort is a spring service that uses a spring data/jpa repository to get our PEConcept (the object we’re trying to update and return with the subscription quey)

@QueryHandler
public PEConcept handle(final GetPEConceptByIdentifier query) {
    return findPEConceptPort.findPEConcept(query.getAggregateId()).orElseThrow(() -> new IllegalStateException("Could not find PEConcept..."));
}

Subscription query (part of private method in RestController)

var query = new GetPEConceptByIdentifier(peConceptFromStorage.getPeConceptAggregateIdentifier());
    SubscriptionQueryResult<PEConcept, PEConcept> result = queryGateway
            .subscriptionQuery(query,
                    ResponseTypes.instanceOf(PEConcept.class),
                    ResponseTypes.instanceOf(PEConcept.class));
    
    // This was the first thing I tried
    // Sending command to process the updated PEConcept
    commandGateway.send(command);
   
   // Also tried to subscribe to the initial result first, and then I see the initial logs
   // result.initialResult().subscribe(i -> log.info("Initial {}", i.getContractId()));

    return result.updates().blockFirst(Duration.ofSeconds(10));

Query update emitter (part of an event handler)

@EventHandler
public void on(final PEConceptAccepted event) {
    log.info("Changes to concept with id {} accepted", event.getPeConceptAggregateIdentifier().getIdentifier());
    final PEConcept currentConcept = findPEConceptPort.findPEConcept(event.getPeConceptAggregateIdentifier())
            .orElseThrow(() -> new IllegalStateException("Could not find PEConcept..."));
    queryUpdateEmitter.emit(GetPEConceptByIdentifier.class,
            query -> event.getPeConceptAggregateIdentifier().equals(query.getAggregateId()),
            currentConcept);
}

Thanks,

Danny

Hi Danny,

I think there are many small issues here resulting in the not working solution.

First of all, make sure you subscribe to all futures and register an error callback - if you miss this, the processing might not start or if producing errors, they got swallowed.

I believe you problem is, that your projection will not return a result if this is not present in the findPEConceptPort but throw an exception. Since you are not subscribing to initial result, and there is no one who takes care of exception thrown by the evaluation of the query method, your execution is finished without any errors. To fix this you should either return null from the query and skip it in the subscription or throw exception, but resume in the subscription, by checking that this exception has no harm. Please have in mind that updates().block(...) is not actually calling anything directly if the Flux is empty.

To simplify the discussion, I implemented a small prototype and pushed it here:

Please have a look on the example project. Especially, there is a “create-and-wait” method in the REST interface you can check.

1 Like

Hi Simon,

Looking at your gateway extension and all your explanations did the trick :grinning:
Thanks for all your help.

Danny

Hey Danny,

Can you please provide the changes you have made to make it work!

Currently I am facing the same issue as yours.

Thanks,
Arun