QueryUpdateEmitter.emit(..) - Is there a way to limit the content of the update itself based on the original query?

Hello,

Currently I am working on a story where I have to update several subscriptions, but each only with a subset of the updated data based on the criteria in the query itself.

Our domain handles about allocating working hours to days.
Below a completely fictional example to demonstrate the issue.
In the real application the update of several days together makes more sense and the “allocations” themselves are handled together by a specific aggregate.

@Value(staticConstructor = "of")
class Allocation {
    LocalDate day;
    int amount;
}

@Value(staticConstructor = "of")
class DateRange {
    LocalDate start;
    LocalDate end;

    boolean contains(LocalDate date) {
        return !date.isBefore(start) && !date.isAfter(end);
    }

    boolean containsAny(Collection<LocalDate> days) {
        return days.stream().anyMatch(this::contains);
    }

    List<LocalDate> toDays() {
        return start.datesUntil(end.plusDays(1)).collect(toUnmodifiableList());
    }
}

@Value(staticConstructor = "of")
class FindAllocationsByDateRangeQuery {
    DateRange range;
}

@Value(staticConstructor = "of")
class AllocationsIncreasedEvent {
    DateRange range;
    int amount;
}

@Service
@RequiredArgsConstructor
class AllocationService {
    private final ReactorQueryGateway reactiveQueryGateway;

    public Flux<List<Allocation>> findAllocations(FindAllocationsByDateRangeQuery query) {
        return reactiveQueryGateway.subscriptionQuery(query, ResponseTypes.multipleInstancesOf(Allocation.class));
    }
}

@Repository
@RequiredArgsConstructor
class AllocationProjection {
    private static final Map<LocalDate, Integer> ALLOCATION_MAP = new ConcurrentHashMap<>();
    private final QueryUpdateEmitter queryUpdateEmitter;

    @QueryHandler
    public List<Allocation> findAllocations(FindAllocationsByDateRangeQuery query) {
        return ALLOCATION_MAP.entrySet()
                .stream()
                .filter(allocation -> query.getRange().contains(allocation.getKey()))
                .map(allocation -> Allocation.of(allocation.getKey(), allocation.getValue()))
                .collect(toUnmodifiableList());
    }

    @EventHandler
    public void on(AllocationsIncreasedEvent event) {
        final List<LocalDate> days = event.getRange().toDays();

        days.forEach(day -> ALLOCATION_MAP.merge(day, event.getAmount(), Integer::sum));

        queryUpdateEmitter.emit(
                FindAllocationsByDateRangeQuery.class,
                query -> query.getRange().containsAny(days), // see 1
                // the thing I think I need, but doesn't exist
                query -> days.stream()
                        .filter(query.getRange()::contains)
                        .map(day -> Allocation.of(day, ALLOCATION_MAP.get(day)))
                        .collect(toUnmodifiableList()) // see 2
        );
    }
}

[1] The QueryUpdateEmitter currently already provides a method where one can filter if the subscription is impacted by the altered data. This to limit the number of subscriptions to which the updated information will be sent.
[2] The part I wrote here doesn’t exist (yet :crossed_fingers:), where I would like to limit the amount of items I want to sent in the update, but limited to the criteria in the query.

Example:

  1. Bob searches for allocations between 2022-01-01 and 2022-01-14.
  2. Alice increase all allocations by one between 2022-01-08 and 9999-12-31 (to make it really clear why I don’t want to sent everything).
  3. Bob should receive an update for 2022-01-08 until 2022-01-04 with the new values.

My Questions:

  • Did I miss something and is the above already somewhere available?
  • Is this a valid use case or should the above be handled in a completely different way?
    For example, a special message could be sent to the client requesting the client to close the old subscription and start a new subscription with the old criteria.
  • In case this is a valid use case, is this something that could be added to the SimpleQueryUpdateEmitter implementation or should we write a custom implementation ourselves?
    I was thinking towards an overloaded version of the doEmit with a Function instead of a SubscriptionQueryUpdateMessage.

Thank you for your help!

Best regards,

Bart

You’re entirely correct in noting there currently is not something like this supported within Axon.
The two ways I can think of supporting this are (1) overloading the emitter (as you already suggest) or (2) adding if-else-branches in the event handler that perform these checks for you (this might get pretty ugly).

Whether this is valid, I believe so.
I want to give it some thought before I give a full-blown yes, though.
Maybe there are some caveats I haven’t thought of yet.

Regardless of my thoughts on the matter, you should always feel free to open up an issue with Axon Framework for enhancements like this. If you feel like it, a pull request would be better even.
Either way, both would provide a handle for the development team at AxonIQ to discuss the topic.

I hope this clarifies the option you have, Bart!

Thank you @Steven_van_Beelen for your response!

After asking the question, I found a clip on the AxonIQ YouTube channel (link) where something similar to the “special message” option from my second bullet point is done.

Projection

Provider

Here, only the initial response is data related and afterwards update events will be sent through a second channel triggering the start of a new subscription using the same search criteria.

Resulting Questions:

  • Would this be the preferred solution?
  • In the example, the client was built with Vaadin running in the same application.
    In our case the client side will be an Angular application.
    I don’t think I can use the concept of the SubscriptionQueryResult in this case, unless I could make two separate calls, return the initialResult in the first and somehow return the updates channel from the first call in the second call. → Not something I want to attempt.
    How can QueryEmitter emit(…) methods resulting in Mono< SubscriptionQueryResult> be used when the subscriber is located in a separate application or is this just not possible?
  • So assuming I can’t use the SubscriptionQueryResult, would it be a valid strategy to return Flux<?> and use this channel for both data and events? (Perhaps it could be improved by some common interface for both the event and the data object.)
public Flux<?> findAllocations(FindAllocationsByDateRangeQuery query) {
     return reactiveQueryGateway.subscriptionQuery(query, ResponseTypes.multipleInstancesOf(Object.class));
}
// ----
queryUpdateEmitter.emit(
        FindAllocationsByDateRangeQuery.class,
        query -> query.getRange().containsAny(days),
        new ReloadEvent()
);

Again thank you for your help!

Best regards,

Bart

Ah, that’s also a path you could traverse if you like.
Nonetheless, I felt you were on to something with adjustments to the QueryUpdateEmitter, as it will potentially minimize update size for every subscription that’s made.
Differently put, it provides a more flexible solution likely usable in a multitude of scenarios.

Depends on where you want to have “the smarts” of deciding who gets who.
I think there’s merit in both solutions, depending on whether you want the Event Handler to understand these deviations or the QueryGateway component to have these.

The QueryUpdateEmitter is not in charge of the initial response.
The initial response comes from the @QueryHandler annotated method invoked based on the given query message.

There are ways to combine the initial result’s Mono and the update’s Flux into a single Flux.
You can find a sample of that here, by the way.
Checking out the rest of that subscription query sample in our Code Samples repository might give you some other ideas too, actually.

Although technically you can, I am not sure I’d go for that solution.
It seems like it’s mixing concerns too much, honestly.
However, if it greatly simplifies the process on the Angular side…who am I to judge whether that’s wrong?
As long as the intent is clear and well documented, thus a conscious decision of the team, I’d lean towards an “okey.”

I’m not sure if you want to open a subscription for each user. Maybe with Vaadin that is the easiest/good way to do things? Bringing up the question since in a demo project I moved filtering certain updates to the application code, where I use one Axon subscription to get all the updates, which is than shared for multiple connected clients.

I’m not sure such a solution should or could always used instead. It’s just a matter of where you want to handle this complexity, and it might be nice if the framework supports this. This is the class where one Axon subscription is shared with multiple clients, each with their own setup filter.