Is there a better way to orchestrate sagas and validate commands than using a dedicated aggregate, and can you guarantee event handling order between separate processing groups?

I am looking for a second opinion on a recent implementation of mine. I am working on an application (really to just get some more experience with Axon and other technologies) that provides a platform for business to easily offer and market loyalty programs to customers. I described this in a little more detail in another post if the context helps.

In summary, I have decided to use command projections to determine if commands are valid inside of command interceptors before commands reach an Aggregate. I recently created a new flow that allowed be to create an account and a loyalty bank in the same api call. I am doing this with a Saga as shown below:

package loyalty.service.command.sagas;

@Saga
@ProcessingGroup("account-and-loyalty-bank-creation-saga-group")
public class AccountAndLoyaltyBankCreationSaga {


    @Autowired
    private transient CommandGateway commandGateway;

    private static final Logger LOGGER = LoggerFactory.getLogger(AccountAndLoyaltyBankCreationSaga.class);

    private String loyaltyBankId = null;
    private String businessId = null;


    @StartSaga
    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountAndLoyaltyBankCreationStartedEvent event) {
        CreateAccountCommand command = CreateAccountCommand.builder()
                .requestId(event.getRequestId())
                .accountId(event.getAccountId())
                .firstName(event.getFirstName())
                .lastName(event.getLastName())
                .email(event.getEmail())
                .build();

        loyaltyBankId = event.getLoyaltyBankId();
        businessId = event.getBusinessId();

        LogHelper.logEventIssuingCommand(LOGGER, event, command);

        commandGateway.sendAndWait(command);
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountCreatedEvent event) {
        CreateLoyaltyBankCommand command = CreateLoyaltyBankCommand.builder()
                .requestId(event.getRequestId())
                .loyaltyBankId(loyaltyBankId)
                .businessId(businessId)
                .accountId(event.getAccountId())
                .build();

        LogHelper.logEventIssuingCommand(LOGGER, event, command);

        commandGateway.sendAndWait(command);
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(LoyaltyBankCreatedEvent event) {
        EndAccountAndLoyaltyBankCreationCommand command = EndAccountAndLoyaltyBankCreationCommand.builder()
                .requestId(event.getRequestId())
                .build();

        LogHelper.logEventIssuingCommand(LOGGER, event, command);

        commandGateway.sendAndWait(command);
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountAndLoyaltyBankCreationEndedEvent event) {
        LOGGER.info(MarkerGenerator.generateMarker(event), "{} event received, ending saga", event.getClass().getSimpleName());
    }
}

The AccountAndLoyaltyBankCreationStartedEvent is issued from what I am calling a SagaOrchestratorAggregate:

package loyalty.service.command.aggregates;

@Aggregate
@NoArgsConstructor
@Getter
public class SagaOrchestratorAggregate {

    private static final Logger LOGGER = LoggerFactory.getLogger(SagaOrchestratorAggregate.class);

    @AggregateIdentifier
    private String requestId;

    @CommandHandler
    public SagaOrchestratorAggregate(StartAccountAndLoyaltyBankCreationCommand command) {
        AccountAndLoyaltyBankCreationStartedEvent event = AccountAndLoyaltyBankCreationStartedEvent.builder()
                .requestId(command.getRequestId())
                .accountId(command.getAccountId())
                .firstName(command.getFirstName())
                .lastName(command.getLastName())
                .email(command.getEmail())
                .loyaltyBankId(command.getLoyaltyBankId())
                .businessId(command.getBusinessId())
                .build();

        LogHelper.logCommandIssuingEvent(LOGGER, command, event);

        AggregateLifecycle.apply(event);
    }

    @CommandHandler
    void handle(EndAccountAndLoyaltyBankCreationCommand command) {
        AccountAndLoyaltyBankCreationEndedEvent event = AccountAndLoyaltyBankCreationEndedEvent.builder()
                .requestId(command.getRequestId())
                .build();

        LogHelper.logCommandIssuingEvent(LOGGER, command, event);

        AggregateLifecycle.apply(event);
    }

    @EventSourcingHandler
    public void on(AccountAndLoyaltyBankCreationStartedEvent event) {
        this.requestId = event.getRequestId();
        LogHelper.logEventProcessed(LOGGER, event);
    }

    @EventSourcingHandler
    public void on(AccountAndLoyaltyBankCreationEndedEvent event) {
        AggregateLifecycle.markDeleted();
        LogHelper.logEventProcessed(LOGGER, event);
    }
}

This allows me to create a command and use the same interception pattern I mentioned before:

package loyalty.service.command.interceptors;

import static loyalty.service.core.constants.DomainConstants.REQUEST_ID;
import static loyalty.service.core.constants.LogMessages.*;

@Component
@RequiredArgsConstructor
public class SagaOrchestratorCommandsInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SagaOrchestratorCommandsInterceptor.class);

    private final AccountLookupRepository accountLookupRepository;
    private final BusinessLookupRepository businessLookupRepository;

    @Nonnull
    @Override
    public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(
            @Nonnull List<? extends CommandMessage<?>> messages) {
        return (index, genericCommand) -> {
            String commandName = genericCommand.getPayloadType().getSimpleName();
            LOGGER.debug(MarkerGenerator.generateMarker(genericCommand.getPayload()), INTERCEPTED_COMMAND, commandName);

            if (StartAccountAndLoyaltyBankCreationCommand.class.equals(genericCommand.getPayloadType())) {
                handleStartAccountAndLoyaltyBankCreationCommand((StartAccountAndLoyaltyBankCreationCommand) genericCommand.getPayload(), commandName);
            }

            return genericCommand;
        };
    }

    private void handleStartAccountAndLoyaltyBankCreationCommand(StartAccountAndLoyaltyBankCreationCommand command, String commandName) {
        throwExceptionIfEmailExists(command.getEmail(), command.getRequestId(), commandName);
        throwExceptionIfBusinessDoesNotExist(command.getBusinessId(), command.getRequestId(), commandName);
    }

    // TODO: move shared lookup and exception handling to common service or util class
    private void throwExceptionIfEmailExists(String email, String requestId, String commandName) {
        AccountLookupEntity accountLookupEntity = accountLookupRepository.findByEmail(email);

        if (accountLookupEntity != null) {
            logAndThrowEmailExistsForAccountException(accountLookupEntity, requestId, commandName);
        }
    }

    private void throwExceptionIfBusinessDoesNotExist(String businessId, String requestId, String commandName) {
        BusinessLookupEntity businessLookupEntity = businessLookupRepository.findByBusinessId(businessId);

        if (businessLookupEntity == null) {
            logAndThrowBusinessNotFoundException(businessId, requestId, commandName);
        }
    }

    private void logAndThrowEmailExistsForAccountException(AccountLookupEntity accountLookupEntity, String requestId, String commandName) {
        Marker marker = MarkerGenerator.generateMarker(accountLookupEntity);
        marker.add(Markers.append(REQUEST_ID, requestId));
        LOGGER.info(
                marker,
                EMAIL_FOUND_ON_ANOTHER_ACCOUNT_CANCELLING_COMMAND,
                accountLookupEntity.getAccountId(),
                commandName
        );

        throw new EmailExistsForAccountException(accountLookupEntity.getEmail());
    }

    private void logAndThrowBusinessNotFoundException(String businessId, String requestId, String commandName) {
        LOGGER.info(
                Markers.append(REQUEST_ID, requestId),
                BUSINESS_NOT_FOUND_CANCELLING_COMMAND, businessId, commandName
        );

        throw new BusinessNotFoundException(businessId);
    }
}

If you’ve made it this far, thank you for wading through all of that context.

Onto my question, as I mentioned before I have these “command projections” that I am using to hold the necessary information I need for determining the validity of a command. How these are created is through event handlers (similar to what you’d see on a query api).

For example here is my AccountLookupEventsHandler class:

package loyalty.service.command.projections;

import static loyalty.service.core.constants.DomainConstants.REQUEST_ID;
import static loyalty.service.core.constants.ExceptionMessages.ACCOUNT_WITH_ID_DOES_NOT_EXIST;
import static loyalty.service.core.constants.LogMessages.*;
import static loyalty.service.core.utils.Helper.throwExceptionIfEntityDoesNotExist;

@Component
@Validated
@ProcessingGroup("account-lookup-group")
public class AccountLookupEventsHandler {

    private final AccountLookupRepository accountLookupRepository;
    private final SmartValidator validator;
    private Marker marker = null;

    private static final Logger LOGGER = LoggerFactory.getLogger(AccountLookupEventsHandler.class);

    public AccountLookupEventsHandler(AccountLookupRepository accountLookupRepository, SmartValidator validator) {
        this.accountLookupRepository = accountLookupRepository;
        this.validator = validator;
    }

    @ExceptionHandler(resultType = IllegalArgumentException.class)
    public void handle(IllegalArgumentException exception) {
        LOGGER.error(marker, exception.getLocalizedMessage());
    }

    @ExceptionHandler(resultType = IllegalProjectionStateException.class)
    public void handle(IllegalProjectionStateException exception) {
        LOGGER.error(marker, exception.getLocalizedMessage());
    }

    @EventHandler
    public void on(AccountCreatedEvent event) {
        marker = Markers.append(REQUEST_ID, event.getRequestId());

        AccountLookupEntity accountLookupEntity = new AccountLookupEntity(event.getAccountId(), event.getEmail());

        marker.add(MarkerGenerator.generateMarker(accountLookupEntity));

        validateEntity(accountLookupEntity);
        accountLookupRepository.save(accountLookupEntity);

        LOGGER.info(marker, ACCOUNT_SAVED_IN_LOOKUP_DB, event.getAccountId());
    }

    @EventHandler
    public void on(AccountUpdatedEvent event) {
        marker = Markers.append(REQUEST_ID, event.getRequestId());

        AccountLookupEntity accountLookupEntity = accountLookupRepository.findByAccountId(event.getAccountId());
        throwExceptionIfEntityDoesNotExist(accountLookupEntity, String.format(ACCOUNT_WITH_ID_DOES_NOT_EXIST, event.getAccountId()));

        BeanUtils.copyProperties(event, accountLookupEntity);

        marker.add(MarkerGenerator.generateMarker(accountLookupEntity));

        validateEntity(accountLookupEntity);
        accountLookupRepository.save(accountLookupEntity);

        LOGGER.info(marker, ACCOUNT_UPDATED_IN_LOOKUP_DB, event.getAccountId());
    }

    @EventHandler
    public void on(AccountDeletedEvent event) {
        marker = Markers.append(REQUEST_ID, event.getRequestId());

        AccountLookupEntity accountLookupEntity = accountLookupRepository.findByAccountId(event.getAccountId());
        throwExceptionIfEntityDoesNotExist(accountLookupEntity, String.format(ACCOUNT_WITH_ID_DOES_NOT_EXIST, event.getAccountId()));

        marker.add(MarkerGenerator.generateMarker(accountLookupEntity));

        accountLookupRepository.delete(accountLookupEntity);

        LOGGER.info(marker, ACCOUNT_DELETED_FROM_LOOKUP_DB, event.getAccountId());
    }

    private void validateEntity(AccountLookupEntity entity) {
        BindingResult bindingResult = new BeanPropertyBindingResult(entity, "accountLookupEntity");
        validator.validate(entity, bindingResult);

        if (bindingResult.hasErrors()) {
            throw new IllegalProjectionStateException(bindingResult.getFieldError().getDefaultMessage());
        }
    }
}

My first question is regarding the order of event handling. Is there a way to guarantee that the AccountLookupEventsHandler will always handle and complete the AccountCreatedEvent before the AccountAndLoyaltyBankCreationSaga?

I feel like I am running the risk of a race condition occurring where the saga will handle the event before the AccountLookupEntity is saved in the database, which will cause the CreateLoyaltyBankCommand to fail due to its interceptor validation requirements.

The next question I have, is what is the proper mechanism for “rolling back” the saga if it does encounter an error. Let’s say the business is deleted after the account is created but before the loyalty bank. This would be another scenario in which the CreateLoyaltyBankCommand will fail. Is there a way for me to pass something like a callback function that can catch the exception thrown by the interceptor and issue a DeleteAccountCommand, or some other more elegant solution?

My final question is pertaining to the design of using the SagaOrchestratorAggregate. Just wondering if people that have worked in large Axon projects before have any recommendations against it, or if it presents certain pitfalls/anti-patterns that I should try to avoid and why.

Thank you so much for taking the time to read and give careful consideration to this post. I hope the conversation that follows is valuable not only to myself, but this amazing community as well. If you want to see the code in its entirety, you can find it here.

TLDR: In this case having a separate aggregate to orchestrate sagas introduces more overhead than it’s worth and you can only specify order within an event processing group (which makes perfect sense when you think about it :man_facepalming:)

For those that are viewing this, I ended up deciding not to use an aggregate for saga orchestration. I decided to do this because I am already validating each of the following commands issued by the saga with command interceptors. As such there is very little value gained by having another aggregate to validate a command to start a saga. So I’ve opted for publishing the event directly.

That being said, I still need to work through how to properly send errors back to the client if a request fails. Currently, I am just returning a 201 Created response no matter what happens in the saga. I think I will start looking into some way for the client to poll the status of the saga as it works towards completion so I can update the front end dynamically based on the result. Having the Saga Orchestration Aggregate would have helped slightly with this, as I could validate the command and send something back immediately if it was invalid, but that doesn’t stop something going wrong during the saga and notifying the user.

I will probably just create a validation class for these StartSaga events to do the same thing that would have been done by the command interceptor before I publish the event to the eventGateway. Giving the same benefits of the aggregate without all the overhead.

If anyone has any suggestions for how to handle updates regarding state of a saga to a client, it would be really appreciated.

Here is the latest example of my controller’s method for starting the saga:

    @PostMapping(path = "/loyaltyBank")
    @ResponseBody
    @ResponseStatus(HttpStatus.CREATED)
    @Operation(summary = "Create account and loyaltyBank")
    public AccountAndLoyaltyBankCreatedResponseModel createAccountAndLoyaltyBank(@Valid @RequestBody CreateAccountAndLoyaltyBankRequestModel request) {
        String accountId = UUID.randomUUID().toString();
        String loyaltyBankId = UUID.randomUUID().toString();
        AccountAndLoyaltyBankCreationStartedEvent event = AccountAndLoyaltyBankCreationStartedEvent.builder()
                .requestId(UUID.randomUUID().toString())
                .accountId(accountId)
                .loyaltyBankId(loyaltyBankId)
                .firstName(request.getFirstName())
                .lastName(request.getLastName())
                .email(request.getEmail())
                .businessId(request.getBusinessId())
                .build();

        LOGGER.info(
                MarkerGenerator.generateMarker(event),
                PUBLISHING_EVENT_FOR_ACCOUNT, event.getClass().getSimpleName(), event.getAccountId()
        );

        // TODO: figure out how to make sure the account and bank are actually created before sending the response
        //  - Idea: create a service/validation class that will check the lookup tables to make sure the event is valid before publishing
        eventGateway.publish(event);

        return AccountAndLoyaltyBankCreatedResponseModel.builder()
                .accountId(accountId)
                .loyaltyBankId(loyaltyBankId)
                .build();
} 

And here is the saga:

@Saga
@ProcessingGroup(COMMAND_PROJECTION_GROUP)
@Order(4)
public class AccountAndLoyaltyBankCreationSaga {


    @Autowired
    private transient CommandGateway commandGateway;
    @Autowired
    private transient EventGateway eventGateway;

    private static final Logger LOGGER = LoggerFactory.getLogger(AccountAndLoyaltyBankCreationSaga.class);

    private String loyaltyBankId = null;
    private String businessId = null;


    @StartSaga
    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountAndLoyaltyBankCreationStartedEvent event) {
        CreateAccountCommand command = CreateAccountCommand.builder()
                .requestId(event.getRequestId())
                .accountId(event.getAccountId())
                .firstName(event.getFirstName())
                .lastName(event.getLastName())
                .email(event.getEmail())
                .build();

        loyaltyBankId = event.getLoyaltyBankId();
        businessId = event.getBusinessId();

        LogHelper.logEventIssuingCommand(LOGGER, event, command);

        try {
            commandGateway.send(command);
        } catch (Exception e) {
            Marker marker = MarkerGenerator.generateMarker(command);
            marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
            LOGGER.error(marker, "CreateAccountCommand failed for account {}", command.getAccountId());
            issueEventToEndSagaOnError(command.getRequestId());

        }
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountCreatedEvent event) {
        CreateLoyaltyBankCommand command = CreateLoyaltyBankCommand.builder()
                .requestId(event.getRequestId())
                .loyaltyBankId(loyaltyBankId)
                .businessId(businessId)
                .accountId(event.getAccountId())
                .build();

        LogHelper.logEventIssuingCommand(LOGGER, event, command);

        try {
            commandGateway.send(command);
        } catch (Exception e) {
                Marker marker = MarkerGenerator.generateMarker(command);
                marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
                LOGGER.error(marker, "CreateLoyaltyBankCommand failed for account {} and loyaltyBank {}", command.getAccountId(), command.getLoyaltyBankId());
                rollbackAccountCreation(event.getRequestId(), event.getAccountId());
        }
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(LoyaltyBankCreatedEvent event) {
        Marker marker = Markers.append(REQUEST_ID, event.getRequestId());

        AccountAndLoyaltyBankCreationEndedEvent endEvent = AccountAndLoyaltyBankCreationEndedEvent.builder()
                .requestId(event.getRequestId())
                .build();

        LOGGER.info(marker, "{} received, issuing {}", event.getClass().getSimpleName(), endEvent.getClass().getSimpleName());

        try {
            eventGateway.publish(endEvent);
        } catch (Exception e) {
            marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
            LOGGER.error(marker, "AccountAndLoyaltyBankCreationEndedEvent failed to process, rolling back loyalty bank creation");
            rollbackLoyaltyBankCreation(event.getRequestId(), event.getLoyaltyBankId());
        }
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(LoyaltyBankDeletedEvent event) {
       rollbackAccountCreation(event.getRequestId(), event.getAccountId());
    }

    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountDeletedEvent event) {
        issueEventToEndSagaOnError(event.getRequestId());
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "requestId")
    public void handle(AccountAndLoyaltyBankCreationEndedEvent event) {
        LOGGER.info(MarkerGenerator.generateMarker(event), "{} event received, ending saga", event.getClass().getSimpleName());
    }

    private void rollbackAccountCreation(String requestId, String accountId) {
        RollbackAccountCreationCommand command = RollbackAccountCreationCommand.builder()
                .requestId(requestId)
                .accountId(accountId)
                .build();

        try {
            commandGateway.send(command);
        } catch (Exception e) {
            Marker marker = Markers.append(REQUEST_ID, command.getRequestId());
            marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
            LOGGER.error(marker, "RollbackAccountCreationCommand failed to process, manual cleanup of saga and aggregate required");
        }
    }

    private void rollbackLoyaltyBankCreation(String requestId, String loyaltyBankId) {
        RollbackLoyaltyBankCreationCommand command = RollbackLoyaltyBankCreationCommand.builder()
                .requestId(requestId)
                .loyaltyBankId(loyaltyBankId)
                .build();

        try {
            commandGateway.send(command);
        } catch (Exception e) {
            Marker marker = MarkerGenerator.generateMarker(command);
            marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
            LOGGER.error(marker, "RollbackLoyaltyBankCreationCommand failed to process, manual cleanup of saga and aggregate required");
        }
    }

    private void issueEventToEndSagaOnError(String requestId) {
        Marker marker = Markers.append(REQUEST_ID, requestId);
        LOGGER.info(marker, "Attempting to end saga due to error");

        AccountAndLoyaltyBankCreationEndedEvent event = AccountAndLoyaltyBankCreationEndedEvent.builder()
                .requestId(requestId)
                .build();

        try {
            eventGateway.publish(event);
        } catch (Exception e) {
            marker.add(Markers.append("exceptionMessage", e.getLocalizedMessage()));
            LOGGER.error(marker, "AccountAndLoyaltyBankCreationEndedEvent failed to process, manual cleanup of saga and aggregate required");
        }
    }

EDIT:
I just realized I never addressed the “can you guarantee event handling order between separate processing groups” portion of my question. After further research I’ve realized it was a really silly question to ask, as the whole point of having separate processing groups is to allow you to process events simultaneously and independently between the groups. This ensures that there isn’t any tight coupling between them. What I needed to do was make sure that my command projection event handlers were in the same processing group as the saga and that they had priority to make sure they updated the projections to the current state before the saga queried the repository. You can use spring’s @Order annotation to specify the order at the class level between classes that have event handlers in the same processing group. (As shown in the annotations of the Saga above).