Event handler

Hi,

I think a further enhancement is needed to the event processing.

In the current axon implementation:

  1. If we have more than 1 event handler for event Y
    There’s no mechanism to ensure event handler is invoked in particular order.

In one particular use case, it is desirable that persistence is invoked before anything else.
This ensures that if the next query on the same domain is done on updated data store.
For example, If there are
a. 2 events JobCreatedEvent and JobAssignedEvent
b. A job listener that listens events listed in (a) and send an alert to user when Job is assigned.
The alert email includes the top 10 priority jobs query

If the persistence listener is invoked after the job notifier,
The top 10 priority jobs query might not include the assigned jobs.

  1. During replay, there’s no mechanism to only allow certain event handler to be invoked

This is needed if recreation database is performed (by replaying the event store),
we don’t notify 3rd party systems.

wdyt?
Are the feature requests valid?

Regards,
Edward Yakop

Hi Edward,

  1. I believe this is a very dangerous feature to have. Event Listeners should never be dependent on each other. I could build a mechanism to allow you to configure the order of dispatching, but to guarantee that an event is processed only after another handler is finished, is hard to do. The reason is simple: imagine you’re using a Message Bus, and the event listeners live on remote machines. The only way to guarantee completion is by raising an (Application or System) Event that triggers another event listener.

What can be guaranteed, however, is that when an aggregate generates 2 events (e.g. JobCreated and JobAssigned), that each event listener will receive the JobCreated first, and the JobAssigned after.

If two listeners are really dependent on each others work, perhaps they should be merged into a single one…

  1. Replaying events is something that you cannot do at runtime. Maybe such a mechanism will be created in the future. What you should do is create a mini program that reads events from the store and send them directly (not through the event bus) to the event listener. In Axon, the annotated event listeners will automatically ignore any events they cannot handle. Just open the firehose and let it drink :wink:

Future support might include a way to “reset” a listener. It could then clean its tables/resources and rebuild them using past events. Just ideas.

Cheers,

Allard

Thanks for the reply,

Reply inline.

Hi Edward,

  1. I believe this is a very dangerous feature to have. Event Listeners should never be dependent on each other. I could build a mechanism to allow you to configure the order of dispatching, but to guarantee that an event is processed only after another handler is finished, is hard to do. The reason is simple: imagine you’re using a Message Bus, and the event listeners live on remote machines. The only way to guarantee completion is by raising an (Application or System) Event that triggers another event listener.

What can be guaranteed, however, is that when an aggregate generates 2 events (e.g. JobCreated and JobAssigned), that each event listener will receive the JobCreated first, and the JobAssigned after.

If two listeners are really dependent on each others work, perhaps they should be merged into a single one…

Ok… I understand for that particular use case that it make sense.
Apologize in advance that my sentence is not clear enough.
What I’m really after is when JobAssignedEvent is dispatched, from the application point of view, the job is really assigned.
That means, if somebody invoke JobRepository#findAllUnassignedJob(), the job that I just assigned, will not be returned by that finder method.

Due to the nature of the product, external parties might extends the system by executing additional logic when certain events are dispatched.
We have 0 controls over these.

  1. Replaying events is something that you cannot do at runtime. Maybe such a mechanism will be created in the future. What you should do is create a mini program that reads events from the store and send them directly (not through the event bus) to the event listener. In Axon, the annotated event listeners will automatically ignore any events they cannot handle. Just open the firehose and let it drink :wink:

Future support might include a way to “reset” a listener. It could then clean its tables/resources and rebuild them using past events. Just ideas.

We’re thinking of resetting an aggregate root instance to a particular time from archiving point of view.
This should be possible if event implementation is done correctly.

Regards,
Edward Yakop

Replied inline…

Thanks for the reply,

Reply inline.

Hi Edward,

  1. I believe this is a very dangerous feature to have. Event Listeners should never be dependent on each other. I could build a mechanism to allow you to configure the order of dispatching, but to guarantee that an event is processed only after another handler is finished, is hard to do. The reason is simple: imagine you’re using a Message Bus, and the event listeners live on remote machines. The only way to guarantee completion is by raising an (Application or System) Event that triggers another event listener.

What can be guaranteed, however, is that when an aggregate generates 2 events (e.g. JobCreated and JobAssigned), that each event listener will receive the JobCreated first, and the JobAssigned after.

If two listeners are really dependent on each others work, perhaps they should be merged into a single one…

Ok… I understand for that particular use case that it make sense.
Apologize in advance that my sentence is not clear enough.
What I’m really after is when JobAssignedEvent is dispatched, from the application point of view, the job is really assigned.
That means, if somebody invoke JobRepository#findAllUnassignedJob(), the job that I just assigned, will not be returned by that finder method.

This is a typical “staleness” problem. All computer systems have that problem. There is no way around it. Eventual consistency does add a little extra staleness to your data, but it’s mere milliseconds that is earned back by the possibility to scale out.

Here is a nice article about this by Udi Dahan: http://www.udidahan.com/wp-content/uploads/Clarified_CQRS.pdf

Due to the nature of the product, external parties might extends the system by executing additional logic when certain events are dispatched.
We have 0 controls over these.

  1. Replaying events is something that you cannot do at runtime. Maybe such a mechanism will be created in the future. What you should do is create a mini program that reads events from the store and send them directly (not through the event bus) to the event listener. In Axon, the annotated event listeners will automatically ignore any events they cannot handle. Just open the firehose and let it drink :wink:

Future support might include a way to “reset” a listener. It could then clean its tables/resources and rebuild them using past events. Just ideas.

We’re thinking of resetting an aggregate root instance to a particular time from archiving point of view.
This should be possible if event implementation is done correctly.

You archiver can easily fetch these events from the event store and apply them to a (copy of the) event listener that creates the database tables. You can apply them to an in-memory model (which should be blazingly fast) and return that model as objects. That should give you “history-on-the-fly” possibilities.

Regards,
Edward Yakop

Cheers,

Allard

Replied inline :stuck_out_tongue:

Replied inline…

Thanks for the reply,

Reply inline.

Hi Edward,

  1. I believe this is a very dangerous feature to have. Event Listeners should never be dependent on each other. I could build a mechanism to allow you to configure the order of dispatching, but to guarantee that an event is processed only after another handler is finished, is hard to do. The reason is simple: imagine you’re using a Message Bus, and the event listeners live on remote machines. The only way to guarantee completion is by raising an (Application or System) Event that triggers another event listener.

What can be guaranteed, however, is that when an aggregate generates 2 events (e.g. JobCreated and JobAssigned), that each event listener will receive the JobCreated first, and the JobAssigned after.

If two listeners are really dependent on each others work, perhaps they should be merged into a single one…

Ok… I understand for that particular use case that it make sense.
Apologize in advance that my sentence is not clear enough.
What I’m really after is when JobAssignedEvent is dispatched, from the application point of view, the job is really assigned.
That means, if somebody invoke JobRepository#findAllUnassignedJob(), the job that I just assigned, will not be returned by that finder method.

This is a typical “staleness” problem. All computer systems have that problem. There is no way around it. Eventual consistency does add a little extra staleness to your data, but it’s mere milliseconds that is earned back by the possibility to scale out.

Here is a nice article about this by Udi Dahan: http://www.udidahan.com/wp-content/uploads/Clarified_CQRS.pdf

That’s a bit odd isn’t it? At least for this particular use case.
If the persister event listener is invoked before another event handler that invoke the repository.
From application point of view at the time that event is dispatched, the listener can expect the right state.

If that’s the case, to get my desired behavior:

  1. I’ll dispatch the domain event “after” the domain is persisted;
  2. Add a boolean property in the domain event to indicates the context of the event, e.g.
    Is it a replay or standard dispatch; and
  3. Change the domain event handling contract.
    If the event is dispatched for replay purposes:
  • My persistent listener will execute persists;
  • Other listeners, should ignore the event.
    If it’s not replay:
  • My persistent listener will not persists;
  • Other listeners should perform their business logic as usual.
    This should handle both use cases nicely.

Due to the nature of the product, external parties might extends the system by executing additional logic when certain events are dispatched.
We have 0 controls over these.

  1. Replaying events is something that you cannot do at runtime. Maybe such a mechanism will be created in the future. What you should do is create a mini program that reads events from the store and send them directly (not through the event bus) to the event listener. In Axon, the annotated event listeners will automatically ignore any events they cannot handle. Just open the firehose and let it drink :wink:

Future support might include a way to “reset” a listener. It could then clean its tables/resources and rebuild them using past events. Just ideas.

We’re thinking of resetting an aggregate root instance to a particular time from archiving point of view.
This should be possible if event implementation is done correctly.

You archiver can easily fetch these events from the event store and apply them to a (copy of the) event listener that creates the database tables. You can apply them to an in-memory model (which should be blazingly fast) and return that model as objects. That should give you “history-on-the-fly” possibilities.

Agree.

Regards,
Edward Yakop

Edward,

if you want this guarantee:
“What I’m really after is when JobAssignedEvent is dispatched, from the application point of view, the job is really assigned. That means, if somebody invoke JobRepository#findAllUnassignedJob(), the job that I just assigned, will not be returned by that finder method.”
you’re really saying that you need full consistency. Full consistency means you cannot have either availability or partition tolerance. An still, you will never really have this guarantee 100%.

Even if you were to dispatch the event inside a single JVM in the same thread that handles the command, you cannot give this full guarantee. What if the request to the repo was done 1ms earlier? You would see the job as unassigned. 1ms later, and it might be assigned. By dispatching asynchronously, you add perhaps 10ms latency. A typical web request takes 500ms. So what’s 10ms extra staleness?

We’ve been making ourselves believe that applications run in a fully consistent environment. They’re not! By the time you see data on screen, the application might have already moved along. So our unassigned job may have long been assigned by someone else.

Of course, this only counts when you’re talking about a client invoking the repository. If it’s another event handler invoking the repository, you should ask yourself two questions. Is it really a bad thing if data is a bit stale. Most of the times, it isn’t. If the answer is yes, here is the second question: aren’t we talking about a single event listener accidentally split in 2?

It’s a topic I focus on quite heavily in my workshop. It’s easier to explain face-to-face with a whiteboard :wink:

Cheers,

Allard

Reply inline.

Edward,

if you want this guarantee:

“What I’m really after is when JobAssignedEvent is dispatched, from the application point of view, the job is really assigned. That means, if somebody invoke JobRepository#findAllUnassignedJob(), the job that I just assigned, will not be returned by that finder method.”

you’re really saying that you need full consistency. Full consistency means you cannot have either availability or partition tolerance. An still, you will never really have this guarantee 100%.

Even if you were to dispatch the event inside a single JVM in the same thread that handles the command, you cannot give this full guarantee. What if the request to the repo was done 1ms earlier? You would see the job as unassigned. 1ms later, and it might be assigned. By dispatching asynchronously, you add perhaps 10ms latency. A typical web request takes 500ms. So what’s 10ms extra staleness?

The only thing that I care about is within the same thread and the event is dispatched synchronously (They live in the same JVM).
I don’t care if the repository is invoked 1 ms earlier, but I do care when JobAssignedEvent is being handled, when 3rd party code invoke the job repository or directly query the db, the job is really assigned.

If this is not the case, there will be extra complexity as event handler can’t trust the persistence when handling the event.
We will be forced to create API that returns aggregate id in some use cases.
For example, The handler now has to:

List unassignedJobIds = jobRepository.findAllUnassignedPriorityOneJobIds(); // Necessary
unassignedJobIds.remove( event.getDomainId() ); // Performance penalty, creating many String instances, no paging.
int count = unassignedJobIds.size();
List topTenJobs = unassignedJobIds.subList( 0, 10 ); // Top ten jobs, but we only have the id not the object
List jobIds = jobRepository.findJobsById( topTenJobs ); // To get job instance
vs.
int count = jobRepository.countAllUnassignedPriorityOneJobs();
List jobs = jobRepository.findUnassignedJobIds( 10 ); // Guarantee that the assigned job will not be there.

We’ve been making ourselves believe that applications run in a fully consistent environment. They’re not! By the time you see data on screen, the application might have already moved along. So our unassigned job may have long been assigned by someone else.

Agree, but full consistent environment is not what I’m after. Not full consistent environment.
Only during event handling time that is handled synchronously that the application persistent behave as expected.

Of course, this only counts when you’re talking about a client invoking the repository. If it’s another event handler invoking the repository, you should ask yourself two questions. Is it really a bad thing if data is a bit stale. Most of the times, it isn’t. If the answer is yes, here is the second question: aren’t we talking about a single event listener accidentally split in 2?

We can’t make customization code to be in the same event listener (those codes are not written by us).
We have no control over these codes.

It’s a topic I focus on quite heavily in my workshop. It’s easier to explain face-to-face with a whiteboard :wink:

Agree :slight_smile:

Regards,
Edward Yakop

Ok, I think I get the problem.

You’re sending the event out to a third party. They will (triggered by that event) do a query for the top 10 jobs in your system. Since your system also reacts to that same event, your repository may or may not have been updated.

First of all, let me say that this is not the ideal way to implement integration. Integration on the CQRS level should by done (ideally) purely via events. If an external party wants to maintain a top 10, they should do so based purely on events.

On the other hand, I do see that there is a level of “service” you want to provide to your customers. In that case, I would have the top10 repository trigger an event, and that the external parties react to that event instead.

How do you external consumers listen to these events? Can’t you combine the repository updater and the component that sends the event to external parties into a single one? That would allow you to store changes in the repository, before forwarding the events to “external” components.

I’ve been looking around at other publish-subscribe mechanisms around (e.g. message queues, spring integration channels), and none of them seem to have a concept of “order” in their consumers. I might be overlooking something. In the meantime, I will see if I can find an easy way to allow influence in the order the event bus dispatches events.

It shouldn’t be too hard, by the way, to create your own Event Bus implementation that gives precedence to certain Event Listeners.

Hope this provides some pointers for you.

Cheers,

Allard

Another idea could be to build a service that maintains the top ten and make the query model responsible for republishing the domain event after the query model has processed it. That way there would be no problem with the query model being eventually consistent. When the third party system receives the domain event they can be sure the query will result in the latest top ten

Reply inline.

Ok, I think I get the problem.

You’re sending the event out to a third party. They will (triggered by that event) do a query for the top 10 jobs in your system. Since your system also reacts to that same event, your repository may or may not have been updated.

Correct

First of all, let me say that this is not the ideal way to implement integration. Integration on the CQRS level should by done (ideally) purely via events. If an external party wants to maintain a top 10, they should do so based purely on events.

Agree.

On the other hand, I do see that there is a level of “service” you want to provide to your customers. In that case, I would have the top10 repository trigger an event, and that the external parties react to that event instead.

Perhaps, what I’m after is this.
That would allow the event handler to describe whether they should be invoked at:

  • Transaction succeed
  • Immediately
  • etcs.

How do you external consumers listen to these events?

The use the same @EventHandler.

Can’t you combine the repository updater and the component that sends the event to external parties into a single one?

That would allow you to store changes in the repository, before forwarding the events to “external” components.

Yes, exactly, but I would still need to have the listener to persist for replay purposes.

I’ve been looking around at other publish-subscribe mechanisms around (e.g. message queues, spring integration channels), and none of them seem to have a concept of “order” in their consumers. I might be overlooking something. In the meantime, I will see if I can find an easy way to allow influence in the order the event bus dispatches events.

Weld mechanism seems the only correct way to handle my cases.
There might be cases where the event handler might want to change the state of the job before being persisted.
Hmm, it’s getting interesting.

It shouldn’t be too hard, by the way, to create your own Event Bus implementation that gives precedence to certain Event Listeners.

Wdyt about the weld event mechanism?

Hope this provides some pointers for you.

Yup, thanks.

Regards,
Edward Yakop

Weld seems to talk about different types of events than Domain Events. A Domain Event is always “after”; it has happened.

If the fact that a repository persisted changes from a Domain Event is so important in your system, make that importance explicit. In other words, create an ApplicationEvent that notifies components that the top10 has been updated. They should then react on that instead.

Alternatively, Shea Kelly’s approach (process and forward) could be a good solution to your problem.

Weld is a nice principle, but I don’t see a good application for it in this area. (Yet)

Cheers,

Allard