Integrating Aggregates and Sagas

Hi all.

As I mentioned before I’m writing a POC with Axon, that is now close to the end. But looking at what is done, I can’t help but to see too much complexity and code…

I read in one of my past posts replies [1]

Sometimes it makes more sense to have an external component, such as a saga, register information with the aggregate.

and also this use case that is somewhat similar to what I want [2]. So to put things in perspective, the POC simulates a Process with this stages:

Create-Start-{Finish/Cancel}-Close

All but the Finish are being handled in a Aggregate by making calls to a injected service bean inside the CommandHandlers and then raising some Event.

`
@CommandHandler
public Object doSomething(SomeCommand command) {
Object something = service.getSomething(something.else());
apply(new SomethingHappened(command.getId(), something ));
return something ;
}

@EventSourcingHandler
public void on(SomethingHappened event) {
this.thing = event.getSomething();
}

`

However Finish has to interact with other services and repositories, etc… so it’s modeled as a Saga.

`
@CommandHandler
public void finishSomethig(FinishCommand command){
FinishEvent event = new FinishEvent (command);
event .setData(this.thing);
eventTemplate.publishEvent(event );
}

`

In the Saga, the FinishEvent handler does lots of things, invokes other things, but in the end in ends on one of two places:

`
@SagaEventHandler(associationProperty = “id”)
public void on(SomethingSucceeded event) throws Exception {
end();
// NOTIFY AGGREGATE OF SUCCESS
}

@SagaEventHandler(associationProperty = “id”)
public void on(SomethingFailed event) throws Exception {
end();
// NOTIFY AGGREGATE OF FAILURE
}

`

So now the question: How to do those Notifications? I tried several ways:

  • passing the Aggregate in the event and invoking the corresponding CommandHandler method
  • passing the Aggregate in the event and invoking the corresponding EventSourcingdHandler method
  • loading the Aggregate from the store and invoking the corresponding CommandHandler method
  • loading the Aggregate from the store and invoking the corresponding EventSourcingdHandler method
  • creating some more commands and events and commandHandlers and eventHandlers and send those commands to the commandGateway

Using the Aggregate directly dis kind of work, but then I noticed that the events triggered did not get stored in the EventStore, so I ended up adding this to the Aggregate (that seems to be a big ugly hack)

`
// BIG UGLY HACK???
@CommandHandler
public void publish(SuperCommand command) {
apply(event.getEvent());
}

`

This way I use the commandGateway in the Saga in a kind of “abstract” way to notify the Aggregate of a multitude of events without having to multiply the number of Commands and Events.

Even like this, just to follow this Create-Start-{Finish/Cancel}-Close pattern I have now:

6 Commands and CommandHandlers
9 Events and EventHandlers

And that is what feels like too much for such a simple use-case. I imagine what it will be in a real-world scenario with dozens of business processes to model…

Am i doing things wrong like this? Are there better good practices?

Any help appreciated.

Cheers.

[1] https://groups.google.com/d/msg/axonframework/-bTpzy0gI6M/ctwvZujXHDMJ
[2] https://groups.google.com/d/topic/axonframework/mikwKBdeRVI/discussion

Hi Antionio,

if a saga needs to notify the aggregate of something, the only way to do so is using a command. Aggregate instances should never be accessed directly. So, your saga would send a NotifyProcessCompletedCommand or something of that sort.

The reason your sample seems very complex, is that CQRS based architectures have a slightly higher initial ‘complexity’. However, the complexity doesn’t increase as fast afterwards, as it would with the more traditional layered architecture. Also note that not all types of applications benefit from this architecture. Maybe you’re better off modeling your aggregates using a BPM mechanism.

The seeming complexity of the extra command and event objects has a big benefit: decoupling. You might end up with a few more objects, but your system is easier to maintain through decoupling of components. If you want to refactor your aggregate, you can safely do so.

Cheers,

Allard

OK, got it, and I’ll stick to that.

What about my big ugly hack (basically, having one command dealing with a multitude of events), do you think it’s a bad practice? Do you see any caveats?

Cheers.

Hi,

calling it a big ugly hack is a pretty good indication of what to expect :slight_smile:
Commands and events should be expressed in domain language (the so called ubiquitous language). SuperCommand isn’t a term I expect a product owner to use.
Furthermore, commands should express the user’s intent. Expressing the events you expect the aggregate to publish isn’t really user-intent, if you ask me.

So that’s definitely a no-go area for me.
Cheers,

Allard

Yeah, I run into some unexpected problems because of it so I’ll just drop the idea.

Cheers.

BTW, I got into another problem with this. When I was at 98% of everything being done, I thought it will look good if the {Finish/Cancel} returned something from the command, so I add a block on a CountDownLatch=1 on the command after sending the events to the Saga, and in the command handlers that handle the success/failure of the Saga set the latch to 0.

But using the SimpleCommandBus that doesen’t work because the thread gets blocked, I think. Then I tried using the DisruptorCommandBus just to fins out that it wouldn’t inject the dependencies on the aggregate, so it was a no go. Then I found out (!) that there is a AsynchronousCommandBus, but the command that is sent from the Saga to the Aggregate (the success/failure) is still only being called after the first command returns.

Then I also found out (!) that there is a Locking Strategy, but I was kicked out of work before I could try it.

Do you have any pointers here? Basically, I want a command to be handled by the Aggregate while another command is still being handled (in this case blocked). Is that possible?

Thanks again.

I don’t think you really want to go there. Your saga is processing an event. If you want it to be able to process events caused by a command generated as a result of that first event, you want the future to catch up with history.
Your saga can wait for the result of a command, if it has to, and that will actually work with the SimpleCommandBus. Writing for ‘future’ events to pass by while holding on to the present isn’t something I’d recommend. And that’s an understatement.

Cheers,

Allard

I’m not sure I understand… The Saga already uses the time out mechanism to throw events that will be handled by itself… So the flow will be something like:

Aggregate:

  • set State = Started
  • handle 1st command
  • raise event E to Saga
  • blocks on latch=1
  • send back State

Saga (in handling E):

  • schedule time out event of type E0

  • calls a service and waits for response

  • if successful raise E1

  • otherwise raise E0

Saga (in handling E0/E1):

  • send Fail/Finish commands

Aggregate (on Fail/Finish handling)

  • set State = Error/Success
  • set latch to 0

Aggregate 1st command returns State.

Does this make sense?

Cheers.

Hi Antonio,

when aggregate publish events (using apply()), they are to published immediately. There is a Unit of Work that coordinates activity. Only when all activity by an aggregate (or a command handler) has completed, the events are actually published. So no matter what you do, your aggregate’s latch can never be released. Hacky solutions excluded, of course.

From an API standpoint, there is no way for a publisher of a command, to wait for the completion of another command, that was indirectly caused by the results of the first. However, if you use the SimpleCommandBus and SimpleEventBus, that does happen to be the case, since everything is executed in the same thread.

If you need absolute consistency (and get a response from the first command), you’d have to do the service call that the Saga currently does, inside the aggregate itself. Another solution would be for the dispatcher of the command, to subscribe to the event us for events about the success or failure of that specific task. Be sure to use timeouts, otherwise a thread might wait indefinitely.

Cheers,

Allard