How to determine what events a given command published?

Hi Everyone,

With reference to the AxonBank example (available at https://github.com/AxonFramework/AxonBank)

Money can be withdrawn from account using the DebitSourceBankAccountCommand command. This is handled by the BankAccountCommandHandler handle method that matches this command, and shown below for completeness.

public class BankAccountCommandHandler {

@CommandHandler

public void handle(DebitSourceBankAccountCommand command) {

try {

Aggregate bankAccountAggregate = repository.load(command.getBankAccountId());

bankAccountAggregate.execute(bankAccount -> bankAccount

.debit(command.getAmount(), command.getBankTransferId()));

} catch (AggregateNotFoundException exception) {

eventBus.publish(asEventMessage(new SourceBankAccountNotFoundEvent(command.getBankTransferId())));

}

}

This command can publish one from the following three events based on the command outcome:

  1. SourceBankAccountNotFoundEvent

  2. SourceBankAccountDebitedEvent

  3. SourceBankAccountDebitRejectedEvent

How can I determine for a given command, which of these events were applied? I would like to know what events were triggered by a given command up to the time when the command is considered complete.

Say that a user invokes a command and based on the published event(s) we want to either show a warning or close a dialog. How can we link the user’s command with the published event(s)?

Hi Everyone,

Following a workaround to the above issue, but I am not certain whether this is correct or not.

Created a unique string (a random UUID v4 instance called it action-reference) and linked the command with the events it generates using the action-reference. The command handler need to pass the metadata to all events it creates for this to work. Following is a more details explanation.

Created a custom callback that only works with objects.

@FunctionalInterface
public interface Callback {
void apply(Object event);
}

Create a map where I can store all callbacks and the action-reference these are linked to.

private final ConcurrentMap<String, Callback> callbacks = new ConcurrentHashMap<>();

Created a special method from where I can send commands to the command gateway. This method takes the callback together with the command.

@Override
public void sendAndListen(final T command, final Callback callback) {
/* Create the random action reference (not checking for collisions) */
final String actionReference = UUID.randomUUID().toString();
callbacks.put(actionReference, callback);

/* Creating the metadata */
final Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put(“action-reference”, actionReference);

/* Wrap the command into a command message and add the metadata (that includes the action-reference) */
final CommandMessage message = GenericCommandMessage.asCommandMessage(command).andMetaData(metadata);

/* Once the command is executed we need to remove the callback and the action-reference from the map of callbacks to prevent memory leaks. There is no point in storing these once the command has finished executing. Create the command callback (from AXON) that removes the callback and the action-reference once ready. Also, in the event of an error, pass the error to the callback so that they can deal with it. /
final CommandCallback<CommandMessage, Object> cc = new CommandCallback<CommandMessage, Object>() {
@Override
public void onFailure(final CommandMessage<? extends CommandMessage> commandMessage, final Throwable cause) {
/
Pass the exception so that the caller can deal with it too */
callback.apply(cause);
callbacks.remove(actionReference);
}

@Override
public void onSuccess(final CommandMessage<? extends CommandMessage> commandMessage, final Object result) {
/* Do not pass the result to the callback as we are only interested in the published events*/
callbacks.remove(actionReference);
}
};

/* Send the command */
commandGateway.send(message, cc);
}

The command handler will work its magic and publish the events. Note that the command handler needs to pass the metadata together with the event. I do not know of a better way to automatically pass the metadata from the command handler to the event.

@CommandHandler
public void handle(final AddNoteToTaskCommand command, final MetaData metaData) {
if (stage == TaskStage.COMPLETE) {
AggregateLifecycle.apply(TaskAlreadyCompletedEvent.of(command), metaData);
} else if (notes.contains(command.getNote())) {
AggregateLifecycle.apply(NoteAlreadyExistsInTaskEvent.of(command), metaData);
} else {
AggregateLifecycle.apply(NoteAddedToTaskEvent.of(command), metaData);
}
}

Finally created an event handler that listens to all events that have an action-reference within their metadata. If the collection of callbacks (the map created before) still contain the action-reference, the this is called and pass the event to the callback.

@EventHandler
public void apply(final Object event, @MetaDataValue(value = “action-reference”, required = true) final String actionReference) {
final Callback callback = callbacks.get(actionReference);
if (callback != null) {
try {
callback.apply(event);
} catch (final Throwable e) {
/*TODO: Deal with errors */
}
}
}

This is how we can use the above

sendAndListen(AddNoteToTaskCommand.of(id, TaskNote.of(“Note 1”)), e-> { /Deal with the events/ });

This is a basic implementation which can be improved further. I am not certain whether this is a good approach to this problem or not. In the event this is a good approach, I will wrap this in a demo project and put it on github. But prefer to get some feedback from the AXON team first as I do not want to steer anyone in the wrong direction. Any feedback is highly appreciate.

Hi Albert,

you say “I want to know what events were published”, but later you mention the example of “having to show the user a warning or close dialog”. These are two very distinct requirements. If the second is what you really need, you don’t need the events.

What I think you’re looking for, is that you can simply return an object from your @CommandHandler method, which describes whether the action was successful. That return value will then be passed to any callbacks (or CompletableFutures) on the dispatching side.

While your approach above will work (as long as the @EventHandler component lives on the same machine and preferably in a SubscribingProcessor), you can also use the “correlation-id” header which is automatically added when using Spring Boot Autoconfiguration. The correlation-id of an event points to the message identifier of the command being sent.

But using a return value in a case like this is to be favored above relying on the events.

Cheers,

Allard

Thank you very much Allard.

This opens a new dimension to the Axon framework. I never thought that we can return things from a command handler and use it from the command callback. As you pointed out I’ve replaced my solution with the simpler option. I am including the new solution for completeness.

@Aggregate
public class Task {

public static enum AddNoteResult {
ADDED, ALREADY_COMPLETE, ALREADY_EXISTS;
}

@CommandHandler
public AddNoteResult handle(final AddNoteToTaskCommand command) {
if (stage == TaskStage.COMPLETE) {
AggregateLifecycle.apply(TaskAlreadyCompletedEvent.of(command));
return AddNoteResult.ALREADY_COMPLETE;
}

if (notes.contains(command.getNote())) {
AggregateLifecycle.apply(NoteAlreadyExistsInTaskEvent.of(command));
return AddNoteResult.ALREADY_EXISTS;
}

AggregateLifecycle.apply(NoteAddedToTaskEvent.of(command));
return AddNoteResult.ADDED;
}
/* Some fields and methods were removed for brevity */
}

The result returned by the handler was used by the CommandCallback as shown next.

gateway.send(AddNoteToTaskCommand.of(id, TaskNote.of(“Note 1”)),
new CommandCallback<AddNoteToTaskCommand, AddNoteResult>() {
@Override
public void onSuccess(CommandMessage<? extends AddNoteToTaskCommand> commandMessage, AddNoteResult result) {
LOGGER.debug(“Result: {}”, result);
}

@Override
public void onFailure(CommandMessage<? extends AddNoteToTaskCommand> commandMessage, Throwable cause) {}
});

I have read the section titled Command Model (found at https://docs.axonframework.org/v/3.1/part2/command-model.html) and I do not believe I met an example where a command handler returns something. May be I missed it or I was looking in the wrong place. I have attached a small project that demonstrated just this, should you or anyone wants to have a look at the full implementation.

Thanks again for your support

command-event-axon-example.zip (17.5 KB)

Hi Albert,

you’re actually right about the reference guide: it doesn’t mention anything about return values. I will add a section to it, also explaining why/that the Constructor CommandHandler returns the identifier of the aggregate that was created.

Thanks for the feedback.
Cheers,

Allard

Thanks a lot for all your contributions Allard.

I would like to clarify something to make sure I am on the correct page. While it is perfectly correct for a command handler to return something or throw something, event handlers, on the other hand, are not expected to throw anything or return anything.

Regards,
Albert Attard

Hi Albert,

you’re welcome.

That’s correct!

Cheers,

Allard

Hi Allard,

Can we make the aggregate’s methods @EventSourcingHandler private? This ensures that at language level these methods are not to be called.

Regards,
Albert

Yes, you can, as long as the JVM’s security settings allow Axon to grant itself access (which is the case, by default).

Cheers,

Allard

Thank you for your prompt reply.

Albert