do we have now a simple way to retrieve aggregate's queued events somehow like in previous version?

Event’s Queue is now stored as UnitOfWork’s resource, but its key isn’t easily retrievable, at least there’s no public api for this. Shouldn’t we add a method to UnitOfWork to access queued messages and possibly modify queue in, say, onPrepareCommit? Alternative may be to pass it as a second parameter to onPrepareCommit. Though, I afraid it’s still not the same as separate queue per aggregate, as this queue may potentially mix events from several aggregates, participating in unit of work. At least, it is still possible to filter events in queue by aggregate id.

Hi Dmitry,

There are quite a few places to hook into with Axon 3. What would you like to achieve? Probably there is an easy way.

For instance, if you want to modify events before they’re saved and published, simply install a MessageDispatchInterceptor on the EventBus.

Rene

Hello René

Hi Dmitry,

There are quite a few places to hook into with Axon 3. What would you like to achieve? Probably there is an easy way.

For instance, if you want to modify events before they’re saved and published, simply install a MessageDispatchInterceptor on the EventBus.

If I understand right, MessageDispatchInterceptor is designed to process every event in a stateless way, without any connection to other events. Let me explain what I need instead.

My aggregate represents a root entity, holding several collections of child entities. Aggregate represents some state on the cloud provider. Every individual child entity applies domain events, which represents its state change, but I also need application level event, which represents cumulative state change of the entire aggregate root. It’s more efficient to apply cloud operations in batches, so my cloud provider interface logic listens to these aggregated events, rather than to individual atomic events.

I see that now there’s added some support of batch handling, although without documentation/samples it takes time to understand, how it can be utilized. I’m going to have a closer look at BatchingUnitOfWork and @ConcludesBatch parameter, appreciate if you can share some usage example of this.

Do I understand right, that all events, published as part of handling one command constitute a batch?

If I have an event handler, how can I can write it in a stateful way to accumulate all events in some aggregate holder property, then process entire batch at once when ConcludesBatch is true, then reset accumulator, such that this will not interfere with concurrent events, coming from other aggregates of the same type?

Accumulator can be probably stored in a map with an aggregate id as key, or saga used, but both of this is more complex to implement, than just be able to access all uncommitted events at once?

Hi Dimitri,

the BatchingUnitOfWork has nothing to do with batching of events from an aggregate. It is about batching multiple invocations of message handlers into a single unit of work instance (for optimization purposes). In other words, this is only relevant on the handling side of the messaging pipeline. This is also true for the @ConcludesBatch parameter. This is just a way for Event Handlers to become aware that they are running in a batch, so that you can execute certain code (like an EntityManager.flush()) that you want to to the least amount possible. There is no guarantee that a batch comes from a single aggregate.

The dispatchInterceptor is indeed static in itself. However, the dispatch interceptor is always invoked in the Unit of Work handling the message that cause the intercepted messages to be dispatched. This means you can use that Unit of Work to capture any relevant state.

I’m not entirely sure what you’re trying to achieve, so at this point, I can’t help you any further than this.

Cheers,

Allard

Hello Allard

Lets consider simple example, based on Order/OrderLineItem aggregate. Lets also consider a command, which changes several line items at once. From ES perspective, change of state, caused by this command, may be represented as several OrderLineItemAdded/OrderLineItemRemoved/OrderLineItemUpdated events. But in another bounded context this change is better to be handled as one composite OrderChanged event, containing collection of OrderLineItem changes. The question is how to build this composite event, or if there’s different commonly used approach of how to represent composite changes in ES systems.

Hi Dmitry,

the question is really, do you see the changes of the Order and OrderLineItems as a “things that happened”, or really as multiple? That’s how you decide to make it one or multiple events.
Then, if another bounded context likes to think of it differently, don’t bother with that inside your aggregate too much, as it will add complexity. Instead, consider using a pattern like the “Anti-corruption layer”. It would translate between the two “languages” used in the different contexts.

If you do really want to have both the multiple, smaller events and the composite event being raised from your aggregate, I wouldn’t look at any framework support for it. Instead, implement your entities to do that “staging”. Since it all happens inside the scope of a single command (and thus a single method), it is fairly simple to coordinate necessary activity from there.

Cheers,

Allard

Thanks for suggestion, Allard

That’s indeed a way to solve this