Observing a Stream of Aggregates

Hi everyone,

I was curious if it’s possible to observe streams of aggregate state and compare successive versions. It’s a pretty broad question, so let me make an example first.

Suppose we had some root aggregate User which contains a highScore. This User contains n sub-aggregates in the form pastGames: List<PastGames> where a PastGame contains a score.

The requirement would be that the highScore of a User is always the maximum of score within pastGames, therefore, whenever an entry in pastGames is added, removed or mutated, the highscore might need to be recalculated.

Depending on the number of commands that lead to events which influence any of these cases (add/remove/update) it can be very easy to forget to recalculate the highscore in the appropriate event handler. This is where my original question enters.

Is there a facility in axon that lets me consume a Stream of Aggregate States (or something equivalent) so i can program a watchdog to alert if an inconsistent state is reached, that is to say alert if after the handling of all events generated by a single command, the highScore is incorrect? Or is such a thing just not possible?

I sketched out what it would look like ideally in terms of involved data structure, though this might be wrong as my understanding of the semantics of Unit of Work is not yet complete enough. So here in (pseudo-ish) Kotlin:

data class AStateUpdate(aggregateState: Aggregate, event: EventMessage<Event>)
data class UpdatesInUnitOfWork(initialState: Aggregate, NonEmptyList<AStateUpdate>)

type TheStreamYoudObserve = Stream<UpdatesInUnitOfWork>

My intuition tells me that it should be possible to roll this by hand using an event handler interceptor that is attached to the processor dealing with eventsourcing handlers. If I can manage to access the aggregate of the currently running unit of work before and after handling that is, this access has to be pretty fast so this interceptor doesn’t impact performance of the system too badly (is that what Scope is for?)

I could then leverage subscription queries for which this interceptor would emit updates in a raw format of (startState: Aggregate, appliedEvent:Event, endState:Aggregate). An intermediate processor at callsite would then take care of the transformation to the datastructures described above.

However I was wondering if Axon already has such a facility built in?

Recalculating each time seems expensive. To keep high scores I would expect a specific projection. That could store something like a map of game id’s to scores, together with a high score. Changing a past score, should create something like a ScoreAdjustedEvent which can be read to update the projection, and if the new score was the high score, possibly lead to a new high score. This way you will always have accurate high scores. Would that solve the problem?

Maybe the example I gave was a bit too simple.

In practice we have many different events that can trigger a change that would require recalculation and it is quite easy to forget to call the recalculate method in the associated handlers.

So what I wanted to do is basically flip this on it’s head and observe aggregate state change not as a sequence of events (since that would also suffer form having to intrinsically know which event can cause pertinent state change) but rather observe aggregate state (the high score and the basis for calculating it in this example are only in one spot so I don’t have to care about which events cause state change).

This would allow me to write a property that just checks whether the high score is up to date, I don’t want to recalculate there but rather just alert. Or if this can be used in testing, test for it.

The idea is kinda taken of how some formal software verification systems work where you can write properties that are interpreted and analyzed over a sequence of states. I just want to alert / fail tests if properties fail rather than proof things though.

In this particular example the approach I’m asking for is of course overkill but in sufficiently sized and complex domains it could be a nice approach to basically encode invariants of your successive aggregate states.

I hope this clarified things?

Not really, as I think the recalculate method is a code smell. With proper design, this should not be needed. Yes, that means you need to know which events affect the high score. But how could you have a recalculate method without knowing which events affect the high score.

It’s possible to easily get all the events belonging to one aggregate. It’s also easy to get all events from the beginning and apply some filter (likely on event class) to get all events. But I’m not sure either of those is an answer to your question.