Hi Everyone,
Following a workaround to the above issue, but I am not certain whether this is correct or not.
Created a unique string (a random UUID v4 instance called it action-reference) and linked the command with the events it generates using the action-reference. The command handler need to pass the metadata to all events it creates for this to work. Following is a more details explanation.
Created a custom callback that only works with objects.
@FunctionalInterface
public interface Callback {
void apply(Object event);
}
Create a map where I can store all callbacks and the action-reference these are linked to.
private final ConcurrentMap<String, Callback> callbacks = new ConcurrentHashMap<>();
Created a special method from where I can send commands to the command gateway. This method takes the callback together with the command.
@Override
public void sendAndListen(final T command, final Callback callback) {
/* Create the random action reference (not checking for collisions) */
final String actionReference = UUID.randomUUID().toString();
callbacks.put(actionReference, callback);
/* Creating the metadata */
final Map<String, Object> metadata = new LinkedHashMap<>();
metadata.put(“action-reference”, actionReference);
/* Wrap the command into a command message and add the metadata (that includes the action-reference) */
final CommandMessage message = GenericCommandMessage.asCommandMessage(command).andMetaData(metadata);
/* Once the command is executed we need to remove the callback and the action-reference from the map of callbacks to prevent memory leaks. There is no point in storing these once the command has finished executing. Create the command callback (from AXON) that removes the callback and the action-reference once ready. Also, in the event of an error, pass the error to the callback so that they can deal with it. /
final CommandCallback<CommandMessage, Object> cc = new CommandCallback<CommandMessage, Object>() {
@Override
public void onFailure(final CommandMessage<? extends CommandMessage> commandMessage, final Throwable cause) {
/ Pass the exception so that the caller can deal with it too */
callback.apply(cause);
callbacks.remove(actionReference);
}
@Override
public void onSuccess(final CommandMessage<? extends CommandMessage> commandMessage, final Object result) {
/* Do not pass the result to the callback as we are only interested in the published events*/
callbacks.remove(actionReference);
}
};
/* Send the command */
commandGateway.send(message, cc);
}
The command handler will work its magic and publish the events. Note that the command handler needs to pass the metadata together with the event. I do not know of a better way to automatically pass the metadata from the command handler to the event.
@CommandHandler
public void handle(final AddNoteToTaskCommand command, final MetaData metaData) {
if (stage == TaskStage.COMPLETE) {
AggregateLifecycle.apply(TaskAlreadyCompletedEvent.of(command), metaData);
} else if (notes.contains(command.getNote())) {
AggregateLifecycle.apply(NoteAlreadyExistsInTaskEvent.of(command), metaData);
} else {
AggregateLifecycle.apply(NoteAddedToTaskEvent.of(command), metaData);
}
}
Finally created an event handler that listens to all events that have an action-reference within their metadata. If the collection of callbacks (the map created before) still contain the action-reference, the this is called and pass the event to the callback.
@EventHandler
public void apply(final Object event, @MetaDataValue(value = “action-reference”, required = true) final String actionReference) {
final Callback callback = callbacks.get(actionReference);
if (callback != null) {
try {
callback.apply(event);
} catch (final Throwable e) {
/*TODO: Deal with errors */
}
}
}
This is how we can use the above
sendAndListen(AddNoteToTaskCommand.of(id, TaskNote.of(“Note 1”)), e-> { /Deal with the events/ });
This is a basic implementation which can be improved further. I am not certain whether this is a good approach to this problem or not. In the event this is a good approach, I will wrap this in a demo project and put it on github. But prefer to get some feedback from the AXON team first as I do not want to steer anyone in the wrong direction. Any feedback is highly appreciate.