Hello,
Currently I am working on a story where I have to update several subscriptions, but each only with a subset of the updated data based on the criteria in the query itself.
Our domain handles about allocating working hours to days.
Below a completely fictional example to demonstrate the issue.
In the real application the update of several days together makes more sense and the “allocations” themselves are handled together by a specific aggregate.
@Value(staticConstructor = "of")
class Allocation {
LocalDate day;
int amount;
}
@Value(staticConstructor = "of")
class DateRange {
LocalDate start;
LocalDate end;
boolean contains(LocalDate date) {
return !date.isBefore(start) && !date.isAfter(end);
}
boolean containsAny(Collection<LocalDate> days) {
return days.stream().anyMatch(this::contains);
}
List<LocalDate> toDays() {
return start.datesUntil(end.plusDays(1)).collect(toUnmodifiableList());
}
}
@Value(staticConstructor = "of")
class FindAllocationsByDateRangeQuery {
DateRange range;
}
@Value(staticConstructor = "of")
class AllocationsIncreasedEvent {
DateRange range;
int amount;
}
@Service
@RequiredArgsConstructor
class AllocationService {
private final ReactorQueryGateway reactiveQueryGateway;
public Flux<List<Allocation>> findAllocations(FindAllocationsByDateRangeQuery query) {
return reactiveQueryGateway.subscriptionQuery(query, ResponseTypes.multipleInstancesOf(Allocation.class));
}
}
@Repository
@RequiredArgsConstructor
class AllocationProjection {
private static final Map<LocalDate, Integer> ALLOCATION_MAP = new ConcurrentHashMap<>();
private final QueryUpdateEmitter queryUpdateEmitter;
@QueryHandler
public List<Allocation> findAllocations(FindAllocationsByDateRangeQuery query) {
return ALLOCATION_MAP.entrySet()
.stream()
.filter(allocation -> query.getRange().contains(allocation.getKey()))
.map(allocation -> Allocation.of(allocation.getKey(), allocation.getValue()))
.collect(toUnmodifiableList());
}
@EventHandler
public void on(AllocationsIncreasedEvent event) {
final List<LocalDate> days = event.getRange().toDays();
days.forEach(day -> ALLOCATION_MAP.merge(day, event.getAmount(), Integer::sum));
queryUpdateEmitter.emit(
FindAllocationsByDateRangeQuery.class,
query -> query.getRange().containsAny(days), // see 1
// the thing I think I need, but doesn't exist
query -> days.stream()
.filter(query.getRange()::contains)
.map(day -> Allocation.of(day, ALLOCATION_MAP.get(day)))
.collect(toUnmodifiableList()) // see 2
);
}
}
[1] The QueryUpdateEmitter currently already provides a method where one can filter if the subscription is impacted by the altered data. This to limit the number of subscriptions to which the updated information will be sent.
[2] The part I wrote here doesn’t exist (yet ), where I would like to limit the amount of items I want to sent in the update, but limited to the criteria in the query.
Example:
- Bob searches for allocations between 2022-01-01 and 2022-01-14.
- Alice increase all allocations by one between 2022-01-08 and 9999-12-31 (to make it really clear why I don’t want to sent everything).
- Bob should receive an update for 2022-01-08 until 2022-01-04 with the new values.
My Questions:
- Did I miss something and is the above already somewhere available?
- Is this a valid use case or should the above be handled in a completely different way?
For example, a special message could be sent to the client requesting the client to close the old subscription and start a new subscription with the old criteria. - In case this is a valid use case, is this something that could be added to the SimpleQueryUpdateEmitter implementation or should we write a custom implementation ourselves?
I was thinking towards an overloaded version of the doEmit with a Function instead of a SubscriptionQueryUpdateMessage.
Thank you for your help!
Best regards,
Bart