I am looking for a second opinion on a recent implementation of mine. I am working on an application (really to just get some more experience with Axon and other technologies) that provides a platform for business to easily offer and market loyalty programs to customers. I described this in a little more detail in another post if the context helps.
In summary, I have decided to use command projections to determine if commands are valid inside of command interceptors before commands reach an Aggregate. I recently created a new flow that allowed be to create an account and a loyalty bank in the same api call. I am doing this with a Saga as shown below:
package loyalty.service.command.sagas;
@Saga
@ProcessingGroup("account-and-loyalty-bank-creation-saga-group")
public class AccountAndLoyaltyBankCreationSaga {
@Autowired
private transient CommandGateway commandGateway;
private static final Logger LOGGER = LoggerFactory.getLogger(AccountAndLoyaltyBankCreationSaga.class);
private String loyaltyBankId = null;
private String businessId = null;
@StartSaga
@SagaEventHandler(associationProperty = "requestId")
public void handle(AccountAndLoyaltyBankCreationStartedEvent event) {
CreateAccountCommand command = CreateAccountCommand.builder()
.requestId(event.getRequestId())
.accountId(event.getAccountId())
.firstName(event.getFirstName())
.lastName(event.getLastName())
.email(event.getEmail())
.build();
loyaltyBankId = event.getLoyaltyBankId();
businessId = event.getBusinessId();
LogHelper.logEventIssuingCommand(LOGGER, event, command);
commandGateway.sendAndWait(command);
}
@SagaEventHandler(associationProperty = "requestId")
public void handle(AccountCreatedEvent event) {
CreateLoyaltyBankCommand command = CreateLoyaltyBankCommand.builder()
.requestId(event.getRequestId())
.loyaltyBankId(loyaltyBankId)
.businessId(businessId)
.accountId(event.getAccountId())
.build();
LogHelper.logEventIssuingCommand(LOGGER, event, command);
commandGateway.sendAndWait(command);
}
@SagaEventHandler(associationProperty = "requestId")
public void handle(LoyaltyBankCreatedEvent event) {
EndAccountAndLoyaltyBankCreationCommand command = EndAccountAndLoyaltyBankCreationCommand.builder()
.requestId(event.getRequestId())
.build();
LogHelper.logEventIssuingCommand(LOGGER, event, command);
commandGateway.sendAndWait(command);
}
@EndSaga
@SagaEventHandler(associationProperty = "requestId")
public void handle(AccountAndLoyaltyBankCreationEndedEvent event) {
LOGGER.info(MarkerGenerator.generateMarker(event), "{} event received, ending saga", event.getClass().getSimpleName());
}
}
The AccountAndLoyaltyBankCreationStartedEvent
is issued from what I am calling a SagaOrchestratorAggregate
:
package loyalty.service.command.aggregates;
@Aggregate
@NoArgsConstructor
@Getter
public class SagaOrchestratorAggregate {
private static final Logger LOGGER = LoggerFactory.getLogger(SagaOrchestratorAggregate.class);
@AggregateIdentifier
private String requestId;
@CommandHandler
public SagaOrchestratorAggregate(StartAccountAndLoyaltyBankCreationCommand command) {
AccountAndLoyaltyBankCreationStartedEvent event = AccountAndLoyaltyBankCreationStartedEvent.builder()
.requestId(command.getRequestId())
.accountId(command.getAccountId())
.firstName(command.getFirstName())
.lastName(command.getLastName())
.email(command.getEmail())
.loyaltyBankId(command.getLoyaltyBankId())
.businessId(command.getBusinessId())
.build();
LogHelper.logCommandIssuingEvent(LOGGER, command, event);
AggregateLifecycle.apply(event);
}
@CommandHandler
void handle(EndAccountAndLoyaltyBankCreationCommand command) {
AccountAndLoyaltyBankCreationEndedEvent event = AccountAndLoyaltyBankCreationEndedEvent.builder()
.requestId(command.getRequestId())
.build();
LogHelper.logCommandIssuingEvent(LOGGER, command, event);
AggregateLifecycle.apply(event);
}
@EventSourcingHandler
public void on(AccountAndLoyaltyBankCreationStartedEvent event) {
this.requestId = event.getRequestId();
LogHelper.logEventProcessed(LOGGER, event);
}
@EventSourcingHandler
public void on(AccountAndLoyaltyBankCreationEndedEvent event) {
AggregateLifecycle.markDeleted();
LogHelper.logEventProcessed(LOGGER, event);
}
}
This allows me to create a command and use the same interception pattern I mentioned before:
package loyalty.service.command.interceptors;
import static loyalty.service.core.constants.DomainConstants.REQUEST_ID;
import static loyalty.service.core.constants.LogMessages.*;
@Component
@RequiredArgsConstructor
public class SagaOrchestratorCommandsInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {
private static final Logger LOGGER = LoggerFactory.getLogger(SagaOrchestratorCommandsInterceptor.class);
private final AccountLookupRepository accountLookupRepository;
private final BusinessLookupRepository businessLookupRepository;
@Nonnull
@Override
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(
@Nonnull List<? extends CommandMessage<?>> messages) {
return (index, genericCommand) -> {
String commandName = genericCommand.getPayloadType().getSimpleName();
LOGGER.debug(MarkerGenerator.generateMarker(genericCommand.getPayload()), INTERCEPTED_COMMAND, commandName);
if (StartAccountAndLoyaltyBankCreationCommand.class.equals(genericCommand.getPayloadType())) {
handleStartAccountAndLoyaltyBankCreationCommand((StartAccountAndLoyaltyBankCreationCommand) genericCommand.getPayload(), commandName);
}
return genericCommand;
};
}
private void handleStartAccountAndLoyaltyBankCreationCommand(StartAccountAndLoyaltyBankCreationCommand command, String commandName) {
throwExceptionIfEmailExists(command.getEmail(), command.getRequestId(), commandName);
throwExceptionIfBusinessDoesNotExist(command.getBusinessId(), command.getRequestId(), commandName);
}
// TODO: move shared lookup and exception handling to common service or util class
private void throwExceptionIfEmailExists(String email, String requestId, String commandName) {
AccountLookupEntity accountLookupEntity = accountLookupRepository.findByEmail(email);
if (accountLookupEntity != null) {
logAndThrowEmailExistsForAccountException(accountLookupEntity, requestId, commandName);
}
}
private void throwExceptionIfBusinessDoesNotExist(String businessId, String requestId, String commandName) {
BusinessLookupEntity businessLookupEntity = businessLookupRepository.findByBusinessId(businessId);
if (businessLookupEntity == null) {
logAndThrowBusinessNotFoundException(businessId, requestId, commandName);
}
}
private void logAndThrowEmailExistsForAccountException(AccountLookupEntity accountLookupEntity, String requestId, String commandName) {
Marker marker = MarkerGenerator.generateMarker(accountLookupEntity);
marker.add(Markers.append(REQUEST_ID, requestId));
LOGGER.info(
marker,
EMAIL_FOUND_ON_ANOTHER_ACCOUNT_CANCELLING_COMMAND,
accountLookupEntity.getAccountId(),
commandName
);
throw new EmailExistsForAccountException(accountLookupEntity.getEmail());
}
private void logAndThrowBusinessNotFoundException(String businessId, String requestId, String commandName) {
LOGGER.info(
Markers.append(REQUEST_ID, requestId),
BUSINESS_NOT_FOUND_CANCELLING_COMMAND, businessId, commandName
);
throw new BusinessNotFoundException(businessId);
}
}
If you’ve made it this far, thank you for wading through all of that context.
Onto my question, as I mentioned before I have these “command projections” that I am using to hold the necessary information I need for determining the validity of a command. How these are created is through event handlers (similar to what you’d see on a query api).
For example here is my AccountLookupEventsHandler
class:
package loyalty.service.command.projections;
import static loyalty.service.core.constants.DomainConstants.REQUEST_ID;
import static loyalty.service.core.constants.ExceptionMessages.ACCOUNT_WITH_ID_DOES_NOT_EXIST;
import static loyalty.service.core.constants.LogMessages.*;
import static loyalty.service.core.utils.Helper.throwExceptionIfEntityDoesNotExist;
@Component
@Validated
@ProcessingGroup("account-lookup-group")
public class AccountLookupEventsHandler {
private final AccountLookupRepository accountLookupRepository;
private final SmartValidator validator;
private Marker marker = null;
private static final Logger LOGGER = LoggerFactory.getLogger(AccountLookupEventsHandler.class);
public AccountLookupEventsHandler(AccountLookupRepository accountLookupRepository, SmartValidator validator) {
this.accountLookupRepository = accountLookupRepository;
this.validator = validator;
}
@ExceptionHandler(resultType = IllegalArgumentException.class)
public void handle(IllegalArgumentException exception) {
LOGGER.error(marker, exception.getLocalizedMessage());
}
@ExceptionHandler(resultType = IllegalProjectionStateException.class)
public void handle(IllegalProjectionStateException exception) {
LOGGER.error(marker, exception.getLocalizedMessage());
}
@EventHandler
public void on(AccountCreatedEvent event) {
marker = Markers.append(REQUEST_ID, event.getRequestId());
AccountLookupEntity accountLookupEntity = new AccountLookupEntity(event.getAccountId(), event.getEmail());
marker.add(MarkerGenerator.generateMarker(accountLookupEntity));
validateEntity(accountLookupEntity);
accountLookupRepository.save(accountLookupEntity);
LOGGER.info(marker, ACCOUNT_SAVED_IN_LOOKUP_DB, event.getAccountId());
}
@EventHandler
public void on(AccountUpdatedEvent event) {
marker = Markers.append(REQUEST_ID, event.getRequestId());
AccountLookupEntity accountLookupEntity = accountLookupRepository.findByAccountId(event.getAccountId());
throwExceptionIfEntityDoesNotExist(accountLookupEntity, String.format(ACCOUNT_WITH_ID_DOES_NOT_EXIST, event.getAccountId()));
BeanUtils.copyProperties(event, accountLookupEntity);
marker.add(MarkerGenerator.generateMarker(accountLookupEntity));
validateEntity(accountLookupEntity);
accountLookupRepository.save(accountLookupEntity);
LOGGER.info(marker, ACCOUNT_UPDATED_IN_LOOKUP_DB, event.getAccountId());
}
@EventHandler
public void on(AccountDeletedEvent event) {
marker = Markers.append(REQUEST_ID, event.getRequestId());
AccountLookupEntity accountLookupEntity = accountLookupRepository.findByAccountId(event.getAccountId());
throwExceptionIfEntityDoesNotExist(accountLookupEntity, String.format(ACCOUNT_WITH_ID_DOES_NOT_EXIST, event.getAccountId()));
marker.add(MarkerGenerator.generateMarker(accountLookupEntity));
accountLookupRepository.delete(accountLookupEntity);
LOGGER.info(marker, ACCOUNT_DELETED_FROM_LOOKUP_DB, event.getAccountId());
}
private void validateEntity(AccountLookupEntity entity) {
BindingResult bindingResult = new BeanPropertyBindingResult(entity, "accountLookupEntity");
validator.validate(entity, bindingResult);
if (bindingResult.hasErrors()) {
throw new IllegalProjectionStateException(bindingResult.getFieldError().getDefaultMessage());
}
}
}
My first question is regarding the order of event handling. Is there a way to guarantee that the AccountLookupEventsHandler
will always handle and complete the AccountCreatedEvent
before the AccountAndLoyaltyBankCreationSaga
?
I feel like I am running the risk of a race condition occurring where the saga will handle the event before the AccountLookupEntity
is saved in the database, which will cause the CreateLoyaltyBankCommand
to fail due to its interceptor validation requirements.
The next question I have, is what is the proper mechanism for “rolling back” the saga if it does encounter an error. Let’s say the business is deleted after the account is created but before the loyalty bank. This would be another scenario in which the CreateLoyaltyBankCommand
will fail. Is there a way for me to pass something like a callback function that can catch the exception thrown by the interceptor and issue a DeleteAccountCommand
, or some other more elegant solution?
My final question is pertaining to the design of using the SagaOrchestratorAggregate
. Just wondering if people that have worked in large Axon projects before have any recommendations against it, or if it presents certain pitfalls/anti-patterns that I should try to avoid and why.
Thank you so much for taking the time to read and give careful consideration to this post. I hope the conversation that follows is valuable not only to myself, but this amazing community as well. If you want to see the code in its entirety, you can find it here.