Axon Saga with RabbitMQ

Hello Team,
I am new to this Axon framework and facing below issue.

In Saga I am unable to publish command and events via RabbitMQ when we call CommandGateway.send(new CreateInvoiceCommand(orderCreatedEvent.orderId)). However if publish directly from the RabbitMQ the messages are consumed in the application. So it seems rabbitmq is not binded with axon framework.

  1. Need to use rabbitmq as a message broker instead of AXON server…
  2. And also Need to use rabbitmq as a message broker along with AXON server.

Please let me know the feasibility and help me to bind axon framework with rabbitmq.

I have shared below code for your reference.


server:
  port: 8080

spring:
  application:
    name: order-service
  datasource:
    driver-class-name: org.postgresql.Driver
    username: postgres
    password: admin
    url: jdbc:postgresql://${DATABASE_HOST}:5432/saga-order-service
  jpa:
    properties:
      '[hibernate.default_schema]': public
    show-sql: true
    generate-ddl: true
    hibernate:
      ddl-auto: update
    database: postgresql

  rabbitmq:
    host: ${RABBITMQ_HOST:localhost}
    username: ${RABBITMQ_USER:retailadmin}
    password: ${RABBITMQ_PASSWORD:retailadmin}
    
axon:
  axonserver:
    enabled: true
  amqp:
    exchange: mysagaexchange
    transaction-mode: transactional
  eventhandling:
    processors:
      amqpEvents:
        source: myQueueMessageSource
        mode: subscribing

logging:
  level:
    org.axonframework: DEBUG

@Configuration
public class OrderServiceConfiguration {

@Bean
public SpringAMQPMessageSource myQueueMessageSource(AMQPMessageConverter messageConverter) {
	return new SpringAMQPMessageSource(messageConverter) {

		@RabbitListener(queues = "mysagaqueue")
		@Override
		public void onMessage(Message message, Channel channel) {
			System.out.println("!!!! order service queue:" + message);
			super.onMessage(message, channel);
		}
	};
}

@Bean
public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
	return DefaultAMQPMessageConverter.builder().serializer(serializer).build();
}

}

@Saga
public class OrderManagementSaga {

	// TODO if we remove transient then it is not working only CreateInvoiceCommand
	// is keep on creating
	@Inject
	private transient CommandGateway commandGateway;

	@StartSaga
	@SagaEventHandler(associationProperty = "orderId")
	public void handle(OrderCreatedEvent orderCreatedEvent) {		
		System.out.println("Saga invoked and order id:" + orderCreatedEvent.orderId + " and order status:"
				+ orderCreatedEvent.orderStatus);
		// send the command to create invoice without the paymentId
		commandGateway.send(new CreateInvoiceCommand(orderCreatedEvent.orderId));
	}

	@SagaEventHandler(associationProperty = "orderId")
	public void handle(InvoiceCreatedEvent invoiceCreatedEvent) {
		// send the create shipping command without the shippingId
		commandGateway.send(new CreateShippingCommand(invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
	}

	@SagaEventHandler(associationProperty = "orderId")
	public void handle(OrderShippedEvent orderShippedEvent) {
		System.out.println("Saga continued for OrderShippedEvent.");
		commandGateway
				.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
	}

	@SagaEventHandler(associationProperty = "orderId")
	public void handle(OrderUpdatedEvent orderUpdatedEvent, OrderCommandService orderCommandService) {
		OrderStatus status = orderCommandService.updateOrder(orderUpdatedEvent.orderId, orderUpdatedEvent.orderStatus);
		System.out.println("Saga ends for OrderUpdatedEvent:" + status.name());
		SagaLifecycle.end();
	}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.2</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.progressivecoder.ordermanagement</groupId>
	<artifactId>order-service</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>order-service</name>
	<description>Order Service for E-Commerce Store</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

		<!-- DB dependency Starts -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
		</dependency>
		<!-- DB dependency Ends -->

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- RabbitMQ -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.axonframework.extensions.amqp</groupId>
			<artifactId>axon-amqp-spring-boot-starter</artifactId>
		</dependency>

		<!-- Axon -->
		<dependency>
			<groupId>org.axonframework</groupId>
			<artifactId>axon-spring-boot-starter</artifactId>
		</dependency>		

		<dependency>
			<groupId>javax.inject</groupId>
			<artifactId>javax.inject</artifactId>
			<version>1</version>
		</dependency>

		<dependency>
			<groupId>com.progressivecoder.saga-pattern</groupId>
			<artifactId>core-apis</artifactId>
			<version>${project.version}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.axonframework</groupId>
				<artifactId>axon-bom</artifactId>
				<version>4.8.0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>


	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Hi Kumar,

If I compare the bits you shared with the integration test used here, you seem to be missing something like:

@Bean
public Binding eventsBinding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue)
                         .to(exchange)
                         .with("#");
}

Please let us know if adding that helped, or if something else might be the problem.

Used below imports
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;

and unable to build the application due to below issue

Description:

Parameter 0 of method eventsBinding in com.progressivecoder.ordermanagement.orderservice.config.OrderServiceConfiguration required a bean of type 'org.springframework.amqp.core.Queue' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.amqp.core.Queue' in your configuration.

I have tried to resolve the build issue but still there is no luck.

Actually with my existing config i am able to consume messages if i publish the messages from RabbitMQ which means application.yml config for rabbitmq is correct. But I want to publish the message via RabbitMQ in SAGA instead of default SagaEventBus , sadly that is not happening. Reference link: Spring AMQP - Axon Reference Guide (axoniq.io)

So i feel below bean is unncecessary in my point of view. Please suggest.

@Bean
	public Binding eventsBinding(Queue queue, TopicExchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with("#");
	}

	@Bean
	public Queue queue() {
		return new Queue("sagaqueue");
	}

	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange("saga");
	}

However as per your suggestion I have added below beans but getting error in the console.

@Configuration
public class OrderServiceConfiguration {

	@Bean
	public SpringAMQPMessageSource myQueueMessageSource(AMQPMessageConverter messageConverter) {
		return new SpringAMQPMessageSource(messageConverter) {

			@RabbitListener(queues = "sagaqueue")
			@Override
			public void onMessage(Message message, Channel channel) {
				System.out.println("!!!! order service queue:" + message);
				super.onMessage(message, channel);
			}
		};
	}

	@Bean
	public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
		return DefaultAMQPMessageConverter.builder().serializer(serializer).build();
	}

	@Bean
	public Binding eventsBinding(Queue queue, TopicExchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with("#");
	}

	@Bean
	public Queue queue() {
		return new Queue("sagaqueue");
	}

	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange("mysagaexchange");
	}
}

and getting below error message in the console:

2023-07-12 18:43:07.841 ERROR 36996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saga' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)
2023-07-12 18:43:08.858 ERROR 36996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saga' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)
2023-07-12 18:43:10.873 ERROR 36996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saga' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)
2023-07-12 18:43:14.883 ERROR 36996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saga' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)

I think it should be new TopicExchange("mysagaexchange") instead of new TopicExchange("saga"). I'm not sure that fixes things, but in the tests, it's the same value as used for the axon.amqp.exchange` property.

Yes. I have used the same TopicExchange(“saga”) in both application.yml (amqp:
exchange: saga) and Configuration class. But the issue is with RabbitMq exchange (framework expects topic but it was Direct)and Queue types (framework expects Durable but it was Transient).

Please clarify below queries:

  1. Is it possible to implement Saga without AXON Server as I tried now but command routing is not happening?

  2. Can we configure below beans auto configuration in the application.yml instead of configuring in the configuration class?
    @Bean
    public Binding eventsBinding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(“#”);
    }

    @Bean
    public Queue queue() {
    return new Queue(“sagaqueue”);
    }

    @Bean
    public TopicExchange topicExchange() {
    return new TopicExchange(“saga”);
    }

I should have read the first message better, my apologies. If I’m correct, you want to use RabbitMq for routing commands? This is not possible.

To send commands we offer the Spring Cloud and the JGroups extension in additional to Axon Server. There are no other distributed command busses that I’m aware of.

Thanks Gerard for your prompt response.

So can you share the details on how to use Spring Cloud and JGroups for distributed command buses instead of using Axon Server ?

Thanks
Kumar

Hi Kumar,

I’ll refer you to the reference guide, with the sections on the JGroups extension and the Spring Cloud extension. Please open a new topic if you have any follow up questions regarding these.