Hi,
Due to our project requirements we decided to change the Event Bus from the Simple Event Bus to the Distributed Event Bus configured as indicated at the end of this email. This configuration contains the producer of the event and the consumer configuration as currently both are in the same application.
After set this configuration for some reason the application sends only one event to the event bus but the event handler is receiving twice the same event. In fact, as I can see in the logs and in the RabbitMq management UI the message is being sent once to the queue and received only once by the BlockingQueueConsumer, as can be seen in the following log traces:
2015-04-22 18:18:37.726 DEBUG 80024 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:’[B@75dc8732(byte[537])'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=Axon.EventBus, receivedRoutingKey=com.chambers.group.domain.events, deliveryTag=1, messageCount=0])
Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received
Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received
So, I understand I have something wrong on my configuration. Can anybody please help me?
Thanks in advance for your help.
Antonio
Configuration:
`
package com.chambers.group.configuration;
import java.net.UnknownHostException;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;
import org.axonframework.eventhandling.ClusteringEventBus;
import org.axonframework.eventhandling.DefaultClusterSelector;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventstore.mongo.DefaultMongoTemplate;
import org.axonframework.eventstore.mongo.MongoEventStore;
import org.axonframework.eventstore.mongo.MongoTemplate;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.support.TaskUtils;
import com.chambers.group.domain.objects.GroupAggregate;
import com.mongodb.MongoClient;
@Configuration
public class AxonConfiguration {
private @Value("${mongo.host}") String mongo;
private @Value("${mongo.port}") int port;
private @Value("${mongo.password}") String password;
private @Value("${mongo.user}") String user;
private @Value("${mongo.db}") String db;
private @Value("${mongo.collection}") String collection;
private @Value("${mongo.snapshot}") String snapshot;
// Rabbit
private @Value("${rabbitmq.host}") String rabbitHost;
private @Value("${rabbitmq.port}") Integer rabbitPort;
private @Value("${rabbitmq.username}") String rabbitUsername;
private @Value("${rabbitmq.password}") String rabbitPassword;
private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;
private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;
private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;
private @Value("${rabbitmq.queue.name}") String rabbitQueueName;
private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;
private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;
private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;
private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;
private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;
private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;
/**
- RabbitMQ Subsystem Configuration
*/
// Serializer
@Bean
public XStreamSerializer xstreamSerializer() {
return new XStreamSerializer();
}
// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}
// Event bus exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}
// Event bus queue
@Bean
public Queue eventBusQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}
// Binding
@Bean
public Binding binding() {
return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());
}
// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
- AXON
*/
// Command Bus
@Bean
public CommandBus commandBus() {
return new SimpleCommandBus();
}
// Event bus
@Bean
public ClusteringEventBus eventBus() {
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());
return clusteringEventBus;
}
// Terminal
@Bean
public SpringAMQPTerminal terminal() {
SpringAMQPTerminal terminal = new SpringAMQPTerminal();
terminal.setConnectionFactory(connectionFactory());
terminal.setSerializer(xstreamSerializer());
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}
// Configuration
@Bean
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}
// Cluster definition
@Bean
SimpleCluster simpleCluster() {
SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);
return simpleCluster;
}
// Message converter
@Bean
DefaultAMQPMessageConverter defaultAMQPMessageConverter() {
return new DefaultAMQPMessageConverter(xstreamSerializer());
}
// Message listener configuration
@Bean
ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
SimpleCluster simpleCluster = simpleCluster();
DefaultAMQPMessageConverter defaultAMQPMessageConverter = defaultAMQPMessageConverter();
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
listenerContainerLifecycleManager.registerCluster(simpleCluster, springAMQPConsumerConfiguration(), defaultAMQPMessageConverter);
return listenerContainerLifecycleManager;
}
// Event listener
@Bean
public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {
AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();
processor.setEventBus(eventBus());
return processor;
}
// Command Handler
@Bean
public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {
AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();
processor.setCommandBus(commandBus());
return processor;
}
// Command Gateway
@Bean
public CommandGatewayFactoryBean commandGatewayFactoryBean() {
CommandGatewayFactoryBean factory = new CommandGatewayFactoryBean();
factory.setCommandBus(commandBus());
return factory;
}
// Event Repository
@Bean
public EventSourcingRepository eventSourcingRepository() {
MongoClient mongoClient = null;
MongoTemplate mongoTemplate = null;
try {
mongoClient = new MongoClient(this.mongo, this.port);
mongoTemplate = new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());
} catch (UnknownHostException e) {
e.printStackTrace();
}
MongoEventStore eventStore = new MongoEventStore(mongoTemplate);
EventSourcingRepository repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);
repository.setEventBus(eventBus());
return repository;
}
@SuppressWarnings(“unchecked”)
@Bean
public AggregateAnnotationCommandHandler groupCommandHandler() {
AggregateAnnotationCommandHandler commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());
return commandHandler;
}
}
`