Hi,
Here a version of Axon Framework with Distributed Event Bus configured with annotations. I’m posting it for future reference just in case somebody needs it. This example Uses Axon 2.4 and has been tested against RabbitMQ 3.4.2
Thanks to Allard for help me to resolve an issue in this configuration.
Regards
`
package com.chambers.group.configuration;
import java.net.UnknownHostException;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;
import org.axonframework.eventhandling.ClusteringEventBus;
import org.axonframework.eventhandling.DefaultClusterSelector;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventBusTerminal;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventstore.mongo.DefaultMongoTemplate;
import org.axonframework.eventstore.mongo.MongoEventStore;
import org.axonframework.eventstore.mongo.MongoTemplate;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.support.TaskUtils;
import com.chambers.group.domain.objects.GroupAggregate;
import com.mongodb.MongoClient;
@Configuration
public class AxonConfiguration {
private @Value("${mongo.host}") String mongo;
private @Value("${mongo.port}") int port;
private @Value("${mongo.password}") String password;
private @Value("${mongo.user}") String user;
private @Value("${mongo.db}") String db;
private @Value("${mongo.collection}") String collection;
private @Value("${mongo.snapshot}") String snapshot;
// Rabbit
private @Value("${rabbitmq.host}") String rabbitHost;
private @Value("${rabbitmq.port}") Integer rabbitPort;
private @Value("${rabbitmq.username}") String rabbitUsername;
private @Value("${rabbitmq.password}") String rabbitPassword;
private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;
private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;
private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;
private @Value("${rabbitmq.queue.name}") String rabbitQueueName;
private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;
private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;
private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;
private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;
private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;
private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;
/**
- RabbitMQ Subsystem Configuration
*/
// Serializer
@Bean
public XStreamSerializer xstreamSerializer() {
return new XStreamSerializer();
}
// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}
// Event bus exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}
// Event bus queue
@Bean
public Queue eventBusQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}
// Binding
@Bean
public Binding binding() {
return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());
}
// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
// Command Bus
@Bean
public CommandBus commandBus() {
return new SimpleCommandBus();
}
// Event bus
@Bean
public EventBus eventBus() {
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());
return clusteringEventBus;
}
// Terminal
@Bean
public EventBusTerminal terminal() {
SpringAMQPTerminal terminal = new SpringAMQPTerminal();
terminal.setConnectionFactory(connectionFactory());
terminal.setSerializer(xstreamSerializer());
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}
// Configuration
@Bean
AMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}
// Cluster definition
@Bean
SimpleCluster simpleCluster() {
SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);
return simpleCluster;
}
// Message converter
@Bean
DefaultAMQPMessageConverter defaultAMQPMessageConverter() {
return new DefaultAMQPMessageConverter(xstreamSerializer());
}
// Message listener configuration
@Bean
ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
return listenerContainerLifecycleManager;
}
// Event listener
@Bean
public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {
AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();
processor.setEventBus(eventBus());
return processor;
}
// Command Handler
@Bean
public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {
AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();
processor.setCommandBus(commandBus());
return processor;
}
// Command Gateway
@Bean
public CommandGatewayFactoryBean commandGatewayFactoryBean() {
CommandGatewayFactoryBean factory = new CommandGatewayFactoryBean();
factory.setCommandBus(commandBus());
return factory;
}
// Event Repository
@Bean
public EventSourcingRepository eventSourcingRepository() {
MongoClient mongoClient = null;
MongoTemplate mongoTemplate = null;
try {
mongoClient = new MongoClient(this.mongo, this.port);
mongoTemplate = new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());
} catch (UnknownHostException e) {
e.printStackTrace();
}
MongoEventStore eventStore = new MongoEventStore(mongoTemplate);
EventSourcingRepository repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);
repository.setEventBus(eventBus());
return repository;
}
@SuppressWarnings(“unchecked”)
@Bean
public AggregateAnnotationCommandHandler groupCommandHandler() {
AggregateAnnotationCommandHandler commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());
return commandHandler;
}
}
`