Commands inside events

Hi again.

I’m having problems in sending commands from inside events. Please remember this is just a prototype for a POC with which I’m messing around, so I’m most probably making many mistakes. In this case I’m doing this:

A controller gets a request from a client that triggers a command:

`
TestStartCommand c1 = new TestStartCommand(id);
commandGateway.sendAndWait(c1);

`

there is a Aggregate accepting this command that triggers a event, and a event handler that handles it:

`

@CommandHandler
public TestService(TestStartCommand command, RestTemplate restTemplate, CommandGateway commandGateway) {
this.restTemplate = restTemplate;
this.commandGateway = commandGateway;
this.id = command.getId();
apply(new TestStartedEvent(command.getId(), command.getData()));
}

@EventHandler
public void on(TestStartedEvent event) {
GetProcessCommand command = new GetProcessCommand(event.getId(),event.getData());
commandGateway.sendAndWait(command); <----------------- EventStreamNotFoundException: Aggregate of type [TestService] with identifier [] cannot be found
apply(new GotProcessEvent(id, result));
}

`

This happens because the second command tries to load the aggregate that was never saved

`
private T loadAggregate(CommandMessage<?> command) {
VersionedAggregateIdentifier iv = commandTargetResolver.resolveTarget(command);
return repository.load(iv.getIdentifier(), iv.getVersion()); <------------------------
}

public DomainEventStream readEvents(String type, Object aggregateIdentifier) {
try {
if (!eventFileResolver.eventFileExists(type, aggregateIdentifier)) {
throw new EventStreamNotFoundException(type, aggregateIdentifier); <------------------------------
}

`

So what am I’m doing wrong here? The problem is the Aggregate not being saved after the aggregate constructor, or this kind of “nesting” should not be used?

I did the search for this, and I found in [1]

The UnitOfWork is nesting aware. That means it is possible to create a single transaction that deals with commands that cause other commands (via events, usually)

but this it seems was in version 1 of Axon (BTW, the forum is hard to search because no posts are tagged with version info)

Note that my “real” intention is to model Sagas like this, but I thought it was better to start with Aggregates (concept that I’m not entirely familiar with).

Thanks for helping.

[1] https://groups.google.com/d/msg/axonframework/md4kvkkqx3E/9qDllswgEjUJ

Hi Antonio,

aggregates shouldn’t fire commands themselves. Especially not commands targeted at the same instance. Since the aggregate is already doing processing, just do a method call on the instance.
Furthermore, the aggregate shouldn’t do anything other than applying state in the EventHandler methods. Be aware that these methods are also invoked for historic events when building up current state. If you load such a historic event, you don’t want to (in this example) send the command again, and again, and again…

If you need to fire a command based on something happening in an aggregate, you have two options: perform the logic immediately in the aggregate itself, as part of the first aggregate, or raise an event and use an external event handler (bean) or saga to generate a command based on that event.

The nesting in UnitOfWork is about a command generating an event, that in turn causes another command, all in the same thread. There is some ‘magic’ involved to make sure events are all published in the correct order.

Hope this helps.
Cheers,

Allard

Hi Allard.

Many thanks for your help, specially considering my doubts are probably more about the concepts than the Axon implementation itself…

But considering a Aggregate as being “a cluster of domain objects that can be treated as a single unit”, let me take a more realistic scenario than just “Test”. Suppose I’m a top-notch auditor and my boss tells me “Toni, audit file 123 from our client xyz”. I, being a top-notch auditor with lots of secretaries, will start by saying to them:

  • you, get me the record of client xyz
  • you, go to the archive and get me file xyz.6346 and file xyz.95684
  • you, get me the paperware for file 123
  • you, phone the client saying we’ve started their audit
  • you, bring me some coffee

Wouldn’t me and my secretaries be a Aggregate, with myself as Root? If so, how can I command then to do things without commands? Please note I do not want a specific secretary to do a specific task, “you” will be just the first one that has the bad luck of crossing my path first, any secretary can do a task (or even more than one) so I don’t want to “perform the logic immediately in the aggregate itself”. If I only use events “and use an external event handler” how can I guarantee the consistence and sequence of the process (basically, orchestration). Note that I only know about file xyz.6346 and file xyz.95684 after I received the record of client xyz, for example.

Again, I understand this questions are more about the concepts and not the Axon implementation, so feel free to not wasting time answering… :slight_smile:

Thanks again.

OK, here’s my second try, more Axion-oriented…

This is my Aggregate:

`

@CommandHandler
public AuditService(AuditStartCommand command) {
this.id = command.getId();
apply(new AuditStartedEvent(command.getId(), command.getData()));
}

@EventHandler
public void on(GotProcessEvent event) {
Object data = event.getData();
if(data==null){
//…
}
}

`

and this is my external event and command handler:

`

@EventHandler
public void on(AuditStartedEvent event) {
GetProcessCommand command = new GetProcessCommand(event.getId(),event.getData());
Object result = commandGateway.sendAndWait(command);
if(result != null){
eventTemplate.publishEvent(new GotProcessEvent(event.getId(), result));
}else{
// apply(new NotProcessEvent(id, result));
eventTemplate.publishEvent(new GotProcessEvent(event.getId(), null));
}
}

@CommandHandler
public Object getProcess(GetProcessCommand command) {
String[] data = (String[]) command.getData();
Object result = restTemplate.getForObject(url, String.class, data);
return result;
}

`

and what happens is that the handler handles the even and the command OK, but the on(GotProcessEvent event) in the aggregate never executes.

Stepping thru the code I see the ``eventTemplate.publishEvent does not actually publishes the event but just adds it to a eventsToPublishOn

`
@Override
public void registerForPublication(EventMessage<?> event, EventBus eventBus, boolean notifyRegistrationHandlers) {
if (logger.isDebugEnabled()) {
logger.debug(“Staging event for publishing: [{}] on [{}]”,
event.getPayloadType().getName(),
eventBus.getClass().getName());
}
if (notifyRegistrationHandlers) {
event = invokeEventRegistrationListeners(event);
}
eventsToPublishOn(eventBus).add(event);
}

`

and that’s it, I’m stuck again…

I’ll keep on digging.

Cheers.

Hi Antonio,

your example with the secretaries is an example of procedural programming. It’s very directive: “you, do this, you do that”. In Axon, things are turned around. Things become reactive. It’s more like you shout through the hallway: “HEY! We’re starting a new audit for xyz”. As a result, your secretaries will know that they need to bring you information. When you have all the information, you shout, ehm… whatever you need to shout.

Typically, you should prevent the need for aggregates to fetch information from another component. That’s not always possible or feasible, though. Sometimes, aggregates need access to domain services (as defined in DDD by Evans) to help them make sense of information.

Regarding your last email, with the code examples, I noticed a few things. First, you use a GetProcessCommand to retrieve information. Since the whole thing is not about changing any state in the application, it a query, not a command. Queries can be implemented using a command-like structure, but generally, they’re just repositories.

EventHandlers inside aggregates only fire for events that are generated by that same aggregate. That’s because aggregates don’t (and shouldn’t) listen to the event bus. Since Axon 2.2, it is recommended to use the @EventSourcingHandler on methods, to make that difference clear. Events generated by aggregates are sent to the Event Bus by the repository when the Unit of Work that loaded the aggregate commits. Normally, this is done by the Command Bus.

Unlike you, I am not an auditing expert with 6 secretaries at my service (sigh), so I don’t really know what should be happening. I can imagine that you need to go through each file and make sure that the entries in one file match the entries in another. Perhaps the aggregate can be seen as a stream processor. Just feed each entry as a command, and the aggregate will start shouting down the hallway (Event bus) when something interesting happens. Alternatively, you could pass a “token” with which the aggregate can stream information from some sort of repository itself. The latter is less efficient, as aggregates tend to work best when they execute short actions.

That brings me to a last remark: not all processes are suitable to model using aggregates and entities. Sometimes, something is just a batch process. The batch process can still be triggered using a command, and spit out events. So you can still use the Command Bus and Event Bus, but leave the Aggregates for what they are…

Hope this helps.
Cheers,

Allard

Hi, thanks for your reply.

In fact I think I’m misusing Aggregates here, probably because of my relative ignorance about the concepts (to know that " An aggregate is a group (a cluster) of objects that work together and are treated as a unit, to provide a specific functionality" it’s probably not all there is to it…). I actually was thinking of Aggregates as a kind of “small-Sagas”… So I’ll go and try to model my use case with Sagas.

Regarding my GetProcessCommand, it’s not about a query in the sense of the Q in CQRS, it’s a request for ancillary info needed for my main workflow, so I guess is a case of " aggregates need access to domain services"… How to better handle those situations?

Regarding the Events and EventHandlers on Aggregates, your explanation did made me understand it, thanks for your clear explanation.

Well, I’m going to make some coffee now, I wish I did have a secretary for that… :slight_smile:

Thanks again for your help.

Cheers.

Hi Antonio,

it’s getting very hard for me to help you out on these details. Building a good model relies on the details of problem you’re trying to solve. I only have a very limited view on what problem at hand is.

Remember that it doesn’t have to be the aggregate retrieving its own information. Sometimes it makes more sense to have an external component, such as a saga, register information with the aggregate.

Cheers,

Allard

Well, I can describe what will be my first “real” scenario that I’ll try to solve, but don’t waste time with it if you have better things to do… :slight_smile:

I described the start of a audit process, but my real scenario will be the end of a audit process, when data is to be (sent|stored|saved) in a number of places both local and remote, some of which we have some control and othes don’t. For instance, a report sent to the customer by FTP, a update to a external partner by REST, to another by SOAP, to “our” database and to remote databases by JDBC, eventually to somewhere in the Cloud.

So until now I’m thinking in

  • a Command (initiated by a user action) that creates and starts a Aggregate
  • a Aggregate that aggregates all those service providers (FTP, REST, SOAP, JDBC)
  • a Saga that executes all the requests and controls the success/failure and compensations

The connection between the Aggregate and the Saga is that somehow puzzles me… The Aggregate receives Commands that send Events listened by the Saga, that does the work and send Commands back to the Aggregate? Should the Saga “do the work” itself or should delegate to other components via Commands? Who coordinates all this, the Saga or the Aggregate? How can I control that several commands finished with success/failure for me to know what to do next?

I know my doubts are because I do have little experience with the concepts, it’s not about the platform, so as I said don’t waste too much time with it…

Nevertheless your help is very welcomed, of course.

Cheers.