Parallel models/what-if analysis

Hi all,

First time poster here, thanks for collating such a great resource.

I’m investigating using Axon Framework where we need to create a kind of parallel model to allow for separate ‘workspaces’. In a workspace, a user can draft a series of changes that may cross various aggregates.

At a high level, we could support this by tagging any events produced in a draft workspace with the workspace name. This could be done by adding a column to the domain event entry table or using metadata somehow. The write model for a draft workspace is then the collection of events in the main workspace (before creating the draft) plus any events in that workspace since.

Read models pose another problem. Some may be able to handle two versions of the universe while some may need to be rebuilt for each workspace. Applying changes from a workspace back into the main workspace would be a matter of replaying the commands (which might fail where there are conflicts).

I’ve done a spike using a custom EventStorageEngine which grabs the current workspace name out of a thread local, and persists it along with event in a separate column. I updated all read queries by adding an extra condition to the WHERE clauses. Seems to work so far, but I’m wondering if there is a better way to implement this functionality (for example I’m unsure of the implications this would have on any TrackingEventProcessors).

Has anyone done something similar and can tell me how terrible my approach is? :slightly_smiling_face:

Cheers,

Matt

Hi all,

I’ve had some more time to work on the spike, and managed to avoid the changes to the database schema and rely only on metadata.

I’ve subclassed the EventSourcingRepository, overriding readEvents as follows:

override fun readEvents(aggregateIdentifier: String?): DomainEventStream {
    val domainEventStream = super.readEvents(aggregateIdentifier)

    // If the creation event was on another branch, bail out now

    if (domainEventStream.hasNext()) {
        val firstMessage = domainEventStream.peek()
        if (!(firstMessage.isForMainModel || firstMessage.isForCurrentModel)) {
            return DomainEventStream.empty()
        }
    }

    var eventSeenForCurrentParallelModel = false


    val mappedEvents = domainEventStream.asStream().map {

        eventSeenForCurrentParallelModel = eventSeenForCurrentParallelModel || it.isForCurrentModel


        // Skip events on the main event stream if we've hit an event on the branch already

        if (it.isForCurrentModel || (it.isForMainModel && !eventSeenForCurrentParallelModel))
            it

        else

            generateNoOpMessage(it)
    }


    return DomainEventStream.of(mappedEvents)
}


Some notes:

- isForMainModel/isForCurrentModel are Kotlin extension properties that read the 'current' model name from a global context and check it against what's in the event metadata
- generateNoOpMessage builds a GenericDomainEventMessage with a null payload

I've written a couple of tests to create aggregates in different parallel models, and also to make conflicting changes on the main stream of events and the parallel model and it seems to be working. There are probably some implications for snapshotting that I need to think about ...

Hi Matt,

that’s an interesting case you’re working on. It does remind me a bit of another case I heard about.
You could consider modelling each “parallel universe” / model as a separate aggregate. However, instead of them having completely independent histories, they refer to eachother’s history. For example, given a main model ‘1’ with history A, B, C, D and E, you could create a new Aggregate ‘2’ that has its first event referencing A-C from aggregate ‘1’. You would need a mechanism that replaces this “marker” event with the actual event from the other aggregate, so that its state can be reproduced.

You can implement this “merge into” logic of an event stream into a wrapper around the EventStore (or by extending it, as you currently did). Be aware, though, that you’ll need to “rewrite” the aggregate identifiers and sequence numbers of events you read, so that they make sense in sequential order. For example, your first event would be “NewParallelUniverseCreatedEvent (originAggregate, originVersion)”, which has sequence 0. All events from “originAggregate” should be presented to the current aggregate under its own aggregate identifier and with sequence 0, so that the next entry receives sequence number 0.

Hope this makes sense.
Cheers,