Sending events from eventhandlers

Hi folks,

my questions is somewhat similiar to that by Sara Pellegrini but goes in a slightly different direction.

In general I try to understand the design principles of the CQRS ES application.
In doing so, my question is how to combine logic of Command Handlers and Event Listeners (and which are allowed and not allowed combinations):

If I got it right:

  • Aggregate’s Comand Handler MAY emit Event but MUST NOT change the state (otherwise the state change can be restored for the aggregate based on stored event stream).
  • Aggregate’s Event Sourcing Listener MAY change state, but MUST NOT emit events (otherwise, these events will be sasved in the event stream and will be duplicated on the aggregate re-creation)
  • Aggregate’s Event Sourcing Listener and View’s Tracking Event Listeners MUST NOT not send Commands, because the replay will cause them to resend commands - except you construct the replay in a way Allard proposes in the post above.
  • Saga’s Event Listeners MAY create new Commands. It is important that Saga association to some business entity detects/prevents duplication.

So thinking about interactions of several bounded contexts in more complex scenarios:

  • Aggregate is the place to put connection from Commands to Events.
  • Saga is the place to put the connection between Events to Commands.

Is that correct?

I can provide some examples for discussion if it is required,
but I think this is somehow fundamental and should not depend on examples.

Kind regards

Simon

Concurring mostly with this, also the importance of getting this right and why, nice summary!

Nitpick: a commandhandler can emit zero, one or more events. You don't always need a Saga to emit multiple events in response to a command.

Jorg

Hi Simon, Jorg,

let’s go through the statements one-by-one. While doing so, I assume we’re Event Sourcing Aggregates.

  • Aggregate’s Comand Handler MAY emit Event but MUST NOT change the state (otherwise the state change can be restored for the aggregate based on stored event stream).

Absolutely correct!

  • Aggregate’s Event Sourcing Listener MAY change state, but MUST NOT emit events (otherwise, these events will be sasved in the event stream and will be duplicated on the aggregate re-creation)

Actually, since some distant Axon version (somewhere in 2.x), it is actually allowed to do so. Axon will automatically ignore applied events when it is rehydrating an instance from its past events. In some cases, especially in more complex aggregate structures, it’s more feasible to make react on events from other parts of the aggregate and possibly react with a new event. It is important to realize that any event applied this way will only be applied after the previous event (the triggering one) has been handled by all entities. If logic needs to be executed after the apply(), one can use apply().andThen(…) to make more decisions based on the state after the event has been applied.
Is it best practice? Definitely not! But it isn’t forbidden either.

  • Aggregate’s Event Sourcing Listener and View’s Tracking Event Listeners MUST NOT not send Commands, because the replay will cause them to resend commands - except you construct the replay in a way Allard proposes in the post above.

Conceptually, it would be a smell to have a projector send a command. It seems like there is a mix of responsibilities inside a single class. You’re correct in that it impacts (i.e. removes) the replayability of such a handler. There may be a workaround for this in future versions, as handlers become aware of the fact they are replaying events that have been previously published. With this feature, still, an error is easily made, and a component is like to act on past events, as if they occur in the present.
Sending commands from an Aggregate is something I avoid. Sending them from an Event Sourcing Listener is dangerous, unless the “isLive” status is explicitly checked. But I find it conceptually ‘off’ to send commands from an aggregate.

  • Saga’s Event Listeners MAY create new Commands. It is important that Saga association to some business entity detects/prevents duplication.

Usually, it’s exactly what Saga Listeners do. They coordinate activity, typically by listening to events and sending out commands (or invoking services). They should maintain state that reflects the state of the transaction they coordinate.

  • Aggregate is the place to put connection from Commands to Events.
  • Saga is the place to put the connection between Events to Commands.

Is that correct?

Yes, that boils it down to the essence…

Cheers,

Allard

Thanks, Allard,

I think it is worth to blog about it. I think I’ll do so.

Kind regards,

Simon

Hi,

What about the case where an AR emits multiple events ? Imagine a CreateDocumentCommand is processed by the AR, and it emits apply(DocumentCreated) and apply(DefaultUserAssigned). Is this acceptable or is it recommended to have a CreateDocumentSaga that sends a AssignDefaultUser command in response to the DocumentCreated event ?

Jorg

the way I git it, this would be acceptable if you apply both events in the command handler, but if you do so in the EventSourcingHandler, you will have duplicate events on replay

Hi Jan,

the latter is actually not the case. Axon takes this into account ant will not re-apply events when it is event sourcing.

Cheers,

Allard

Hi Allard,

if I got you right, AxonFramework will take care that an event emmited from EventSourcingHandler of an event-sourcing Aggregate will not get duplicated during Aggregate restore?
Is there some special logic preventing the duplication of events?

Still the question: is it intended or not to send events from event handlers? From your first post I thought it is an anti-pattern?

P.S. to get an example: Imagine a AR “Foo” listening to command “DoFoo” wich will cause a “FooDone” and “BarDone” event.
My natural thinking was to put both AggregateLifecycle.apply(new FooDone()) and AggregateLifecycle.apply(new BarDone()) inside of the method body of the DoFoo command handler.
If there is some state change for the aggregate I could additionally put two EventSouricngHandler inside the aggregate to react on both events.

What I considered as an antipattern is to create a EventSouricngHandler for “FooDone” and call AggregateLifecycle.apply(new BarDone()) from it, instead from the CommandHandler.

Kind regards,

Simon

Hi Simon,

Axon uses the sequence numbers in the events and the snapshot to load events since the last snapshot. However, someone recently reported an issue that might be related: https://github.com/AxonFramework/AxonFramework/issues/455.

It is not per definition a bad thing to apply events in an event handler. Especially when aggregates grow more complex, it sometimes makes sense to publish an additional event as a reaction to an event. However, it is better to apply both methods one at a time from the command handler.

Imagine two entities, part of a single aggregate: Job and Task. A Job consists of different tasks. Imagine a rule that the Job is done when all Tasks are completed. The "CompleteTaskCommand"s could be routed directly to the task instances, and raise a “TaskCompletedEvent”. In your Job, you could listen to this event. When it sees all tasks are completed, it could apply an “JobDoneEvent”. To do this in a command handler, would require you to move the handler from Task to Job. That may, ,or may not, feel awkward. And “awkward” is the main thing to prevent in a model.

In upcoming versions, we want to support in-aggregate command interceptors. That could also be a place where this logic could be executed.

Hope this clarifies things.
Cheers,

Allard

Ok,

I understand this. I thout the way around this is a Saga. Look how we implemented this:

https://github.com/holisticon/ranked/tree/master/backend/command/src/main/kotlin

Kind regards.

A Saga would be the solution of choice if Job and Task were two separate aggregates. Using a Saga to coordinate activity for a single aggregate is a smell as well. Such logic should be encapsulated within the aggregate as much as possible.

Hi Allard,

"- Aggregate’s Comand Handler MAY emit Event but MUST NOT change the state (otherwise the state change can be restored for the aggregate based on stored event stream).

Absolutely correct!"

But an event is a thing of the past, it tells you that the change already happened. So if an event is emitted, the change in state of the AR it describes has taken place already. That is why we change the AR state in the command handler, emit the event which is then handled in event handler in other parts of the system, not in AR e.g. to update read models.

The event handlers in the AR should only react to events being replayed. Otherwise - that is the problem we have now - these event handlers are triggered twice by Axon - that is what we are experiencing currently.

Would it be a good idea to make Axon distinguish between “replaying event handlers” and “live event handlers” ? E.g. do you plan to introduce an " “isLive” status flag"? Or by default make it to prevent that an event handler in an AR is called for an event that has been thrown from the very same AR?

Your proposed PartialResetToken in https://groups.google.com/d/msg/axonframework/DuxjesdvU00/EHvH-HCLCQAJ does not solve our issue as we have a fully consistent system with immediate event handling ad thus we cannot use tracking event processors at all.

br
Marek

Hi,

now I have realized there is AggregateLifecycle.isLive() to solve this
issue. If there are events which originate only in a certain aggregate
class, is seems natural that the event handlers for them located in the
very same aggregate class should only be invoked during event replaying. I
can wrap the whole body of them in if (!isAlive()) t oachieve that.

This is not a very elegant solution though, because these event handlers
should not have been invoked in the first place, except when replaying is
taken place. Is there a better solution? E.g. to assign all such
"replay-only" event handlers to a separate ProcessingGroup with and to
define an Event Handler Interceptor in the Event Processor for that group -
but could I use isAlive() within that Interceptor?

br
Marek

Hi Marek,

I think there is a big misunderstanding on how event sourcing works. When using event sourcing, you should never apply state changes in a command handler. All state changes must be done in an Event(Sourcing)Handler. The Command Handlers perform validation and decide which Events are published. The apply() function will invoke the Event(Sourcing)Handlers on the aggregate immediately, and schedule the event for publication. Note the “schedule”; they are not published to the “outside world” just yet.

Generally, you don’t need to distinguish between “live” and “replaying” state. If you apply() a new event in an Event Handler, Axon will only really apply that event when the Event is a “live” one. Not when replaying events.

Event Handlers outside of the aggregate are only invoked for “live” events anyway, after the command has finished processing the aggregate.

To me, it sounds like the approach you’re taking makes things unnecessarily complex. The concept of “isLive” lives in the context of an aggregate. The ProcessingGroup is a concept from event handling outside of the aggregates. These are distinct processes that have no direct relation.

Hope this clarifies things.
Cheers,

Allard

Hi Allard,

thanks a lot!

1)now, after studying some of the blogs and posts on ES flow I have to admit I have not been aware of the fact that your understanding is the prevalent one.;-( Still, I think there is at least a naming inconsistency issue if not a conceptual issue:

“The apply() function will invoke the Event(Sourcing)Handlers on the aggregate immediately, and schedule the event for publication. Note the “schedule”; they are not published to the “outside world” just yet.”

That means, when an event is applied in the command handler its event handler in this AR is used just to execute certain state change - “applyStateChange” in AR even before publishing the event. The event might not actually be published at all eventually e.g. if a problem occurs in that event handler. So the event has not happened yet, but its event handler has been called anyway…

On the other hand, the same event handler is used to react to the same event if that event has been published in a different AR (which is probably not a usual case due to Single Responsibility Principle). Also, the read model event handlers are always used just for “already happened” events, cause read models never publish events of course…

See? Imho, a “handler of something” handles that “something” after it has occurred. A command has been invoked (e.g. by GUI) -> command handler. The same should apply for event handlers - consistently.

“Generally, you don’t need to distinguish between “live” and “replaying” state. If you apply() a new event in an Event Handler, Axon will only really apply that event when the Event is a “live” one. Not when replaying events.”

In a fully consistent system, where events are handled synchronously, does this guarantee the same sequence of processing during replay? Example: the event B is applied (live) in the middle of an event handler for event A. Event store contains {A, B }, right? During replay from event store the execution of event handlers A and B is not interleaved anymore, is it?

Allard: “You should organize your event handlers so that the ones that can be replayed are separated from the ones that can’t.”
Steven van Beelen: “For now we’ve introduce a application property denoting whether the application is in replay or not”

br
M.

Hi Marek,

Gonna add some extra back ground here to your points of interest.

1):
The issue you describe that ‘the state is already changed, whilst the event could not be stored’ shouldn’t be an issue in an Event Sourcing system.

As the Events are the Source of you aggregate, any events which cannot be stored will thus not prevail as state changes for that aggregate.

So the behavior Allard describes that the events are scheduled to be stored, so that the framework will first call all Event Sourcing Handler to adjust the state (and potential apply additional events) is safe.

It might be that the state change was in vain if the storing of the event failed, but you will not see that state change when you’re ‘event sourcing your aggregate’ from the Event Store.

For you second point under 1):
An event handler in the Aggregate Root will only ever handle events which it itself and any entities living under have published.

Hence it will never handle events from other aggregates.

This deviations is the main reason Axon Framework has an ‘@EventHandler’ and ‘@EventSourcingHandler’ annotation, where the latter is used in aggregates.

The @EventSourcingHandler is thus used to source your command model / aggregate based on the events it has published.

2):

Events will be handled in a replay in the order they have been inserted in the Event Store. So yes, your assumption is correct here.

Hope this helps!

Cheers,

Steven