Hi,
I’m trying to implement a saga using Axon 4.1. I have three microservices, developed as Spring Boot applications: orders, stocks and payments.
I’m using Consul for distributing the command bus and RabbitMQ for distributing the event bus.
My order service implements a Saga as follows:
@Saga
public class ProcessPurchaseSaga implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(ProcessPurchaseSaga.class);
private transient CommandGateway commandGateway;
private ProcessPurchaseStartedEvent purchase;
/**
* This is called
*/
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void handle(ProcessPurchaseStartedEvent event) {
purchase = event;
SagaLifecycle.associateWith("accountNumber", purchase.accountNumber);
// TODO: Fetch the unit price from stocks service
final double unitPrice = 20;
// Trigger the payment (could we call the REST API instead?)
commandGateway.send(
new DebitMoneyCommand(
purchase.accountNumber,
purchase.quantity * unitPrice,
"USD"
)
);
}
/**
* Never called!
*/
@EndSaga
@SagaEventHandler(associationProperty = "id", keyName = "accountNumber")
public void handle(MoneyDebitedEvent event) {
logger.debug("Money {} paid for {}", event.debitAmount, event.id);
commandGateway.send(
new CreateInvoiceCommand(
UUID.randomUUID().toString(),
new java.util.Date(),
purchase.accountNumber,
purchase.itemNumber,
purchase.quantity,
event.debitAmount
)
);
}
@Autowired
public void setCommandGateway(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
}
Each microservice has the same configuration for AMQP:
@Configuration
@Profile("rabbitmq")
public class AMQPConfig {
@Bean
public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
return DefaultAMQPMessageConverter.builder().serializer(serializer).build();
}
@Bean
public SpringAMQPMessageSource myQueueMessageSource(AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@RabbitListener(queues = "#{eventBusQueue.name}")
@Override
public void onMessage(Message message, Channel channel) {