Hi everyone,
I was curious if it’s possible to observe streams of aggregate state and compare successive versions. It’s a pretty broad question, so let me make an example first.
Suppose we had some root aggregate User
which contains a highScore
. This User
contains n sub-aggregates in the form pastGames: List<PastGames>
where a PastGame
contains a score
.
The requirement would be that the highScore
of a User
is always the maximum of score
within pastGames
, therefore, whenever an entry in pastGames
is added, removed or mutated, the highscore might need to be recalculated.
Depending on the number of commands that lead to events which influence any of these cases (add/remove/update) it can be very easy to forget to recalculate the highscore in the appropriate event handler. This is where my original question enters.
Is there a facility in axon that lets me consume a Stream of Aggregate States (or something equivalent) so i can program a watchdog to alert if an inconsistent state is reached, that is to say alert if after the handling of all events generated by a single command, the highScore
is incorrect? Or is such a thing just not possible?
I sketched out what it would look like ideally in terms of involved data structure, though this might be wrong as my understanding of the semantics of Unit of Work is not yet complete enough. So here in (pseudo-ish) Kotlin:
data class AStateUpdate(aggregateState: Aggregate, event: EventMessage<Event>)
data class UpdatesInUnitOfWork(initialState: Aggregate, NonEmptyList<AStateUpdate>)
type TheStreamYoudObserve = Stream<UpdatesInUnitOfWork>
My intuition tells me that it should be possible to roll this by hand using an event handler interceptor that is attached to the processor dealing with eventsourcing handlers. If I can manage to access the aggregate of the currently running unit of work before and after handling that is, this access has to be pretty fast so this interceptor doesn’t impact performance of the system too badly (is that what Scope
is for?)
I could then leverage subscription queries for which this interceptor would emit updates in a raw format of (startState: Aggregate, appliedEvent:Event, endState:Aggregate)
. An intermediate processor at callsite would then take care of the transformation to the datastructures described above.
However I was wondering if Axon already has such a facility built in?