Infinite sagas for bulk command dispatching

Hello, i have a little problem in my current application.

I have an event that cannot be associated by any mean with anything within a bounded context, however when this event occurs i want to send out to every aggregate some command.

`

DownstreamEvent(id:DownstreamId)
AggregateCreatedEvent(id:AggregateId)
ReceiveDownstream(id:AggregateId, downstream:DownstreamId)

//When downstream event occurs, every aggregate should receive command ‘ReceiveDownstream’
//To put it simple, when it occurs for (aggregate allAggregates) gateway.send(ReceiveDownstream(aggregate, downstream)

`

Problem with above is that it is not persisted, wel it could be persisted in memory if i ensure that the events are always replayed from the beginning, but that would be a waste of memory i guess.

Another approach is with write side that would basically save every aggregateid and downstream id that were introduced in a system in mongo, but having write side just for this seems like a waste too.

Third option is to use sagas with ‘always accept property resolver’, i would need two sagas:
One that would handle down streams events that happened before aggregate was created, so every succeeding creation will receive an event
Second saga to handle down stream events that were received after fund was created, meaning saga starts when aggregate is created and then receives command on succeeding down stream events.

Sagas do persisting for me automatically (Mongo extension), so i do not need to replay the events, every saga will be associated with one id either the aggregate id or the downstream id,
the problem here is that i need to introduce ‘ayways true’ property, for the downstream/aggregate created event, so i’m always accepting it.

My question is, is that a good approach and if someone else could show me a way where i could do some ‘bulk’ command dispatching

Many thanks

Excuse me, i couldnt publish with code sample, here it is:

For event handler approach:

`
class EventHandler {
var aggregates = setOf()
var downstreams = setOf()

fun on(evt: DownstreamEvent) {
downstreams += evt.id
//Downstream received before aggregate created
for (id in aggregates)
gateway.send(ReceiveDownstream(id, evt.id)
}

fun on(evt: AggregateCreatedEvent) {
//Aggregate received before downstream
for (id in downstreams)
gateway.send(ReceiveDownstream(evt.id, id)
aggregates += evt.id
}
}
`

Saga:

`
@Saga
class RegistrationProcessForOldUpstreams {
@Autowired
@Transient
private lateinit var commandGateway: CommandGateway

private lateinit var downstream: DownstreamId

@StartSaga
@SagaEventHandler(associationProperty = “id”)
fun start(evt: DownstreamCreated) {
this.downstream = evt.id
associateWith(AlwaysTruePropertyResolver.PROPERTY_NAME, AlwaysTruePropertyResolver.PROPERTY_VALUE)
}

@SagaEventHandler(associationProperty = AlwaysTruePropertyResolver.PROPERTY_NAME, associationResolver = AlwaysTruePropertyResolver::class)
fun on(evt: UpstreamCreated) {
this.commandGateway.send(RegisterDownstream(evt.id, this.downstream))
}
}

@Saga
class RegistrationProcessForNewDownstreams {
@Autowired
@Transient
private lateinit var commandGateway: CommandGateway

private lateinit var id: UpstreamId

@StartSaga
@SagaEventHandler(associationProperty = “fund”)
fun start(evt: UpstreamCreated) {
this.id = evt.id
associateWith(AlwaysTruePropertyResolver.PROPERTY_NAME, AlwaysTruePropertyResolver.PROPERTY_VALUE)
}

@SagaEventHandler(associationProperty = AlwaysTruePropertyResolver.PROPERTY_NAME, associationResolver = AlwaysTruePropertyResolver::class)
fun on(evt: DownstreamCreated) {
this.commandGateway.send(RegisterDownstream(this.id, evt.id))
}
}
`

Hi Robert,

To be honest, I think I’d personally go for a component that handles the Aggregate Created Events and the Downstream Event.
The AggregateCreatedEvent Handler will stores the AggregateId’s, and the DownstreamEvent Handler will use this Query Model to publish new commands.

I understand that you feel having this model is a waste, but moving this into a Saga is a similar format of waste, it’s just obstructed from you by the framework.
The reason I am steering away from a Saga, is because the intent of Saga’s is to model a Complex Business Transaction which has a start and end to it’s lifecycle.
What you are describing, is a never-ending Saga, which feels like a bigger drawback then having a dedicated AggregateId Query Model.

To conclude I wouldn’t be to scared or put off by having a small Query Model that only contains the Aggregate Ids.
The purpose of segregating the Command and Query Model is done so that you can tailor towards the exact requirements of both.
Apparently, your system has the requirement to be able to store a set of all the Aggregate Id’s.
As such, it is completely fine to have such a model somewhere!

This is my 2 cents to the situation, hope this helps you out Robert!

Cheers,
Steven

Hey Steven,

Ah yes, the query model. The query model has been a nightmare for me in terms of deciding where i should use it.
The issue is that the query model is explicitly eventually consistent therefore if i decide to make any decision based on what the query has for me i would need also to assure replays in case the query model is not consistent, unless you mean the write-model query, maning creating a write model repository and query that in the command handler, if that’s the thing then yes that was my plan B but it still feels weird to create the additional write model repository (in java its just pain) while everything could be handled by cherry picking the events from the stream of some aggregates. I will still think about some solution that would not force me to create additional repository, but thank you for the response, always good to read that im on semi-right track :slight_smile: