Event handler processing an event message from RabbitMQ twice

Hi,

Due to our project requirements we decided to change the Event Bus from the Simple Event Bus to the Distributed Event Bus configured as indicated at the end of this email. This configuration contains the producer of the event and the consumer configuration as currently both are in the same application.

After set this configuration for some reason the application sends only one event to the event bus but the event handler is receiving twice the same event. In fact, as I can see in the logs and in the RabbitMq management UI the message is being sent once to the queue and received only once by the BlockingQueueConsumer, as can be seen in the following log traces:

2015-04-22 18:18:37.726 DEBUG 80024 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:’[B@75dc8732(byte[537])'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=Axon.EventBus, receivedRoutingKey=com.chambers.group.domain.events, deliveryTag=1, messageCount=0])

Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received

Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received

So, I understand I have something wrong on my configuration. Can anybody please help me?

Thanks in advance for your help.

Antonio

Configuration:

`
package com.chambers.group.configuration;

import java.net.UnknownHostException;

import org.axonframework.commandhandling.CommandBus;

import org.axonframework.commandhandling.SimpleCommandBus;

import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;

import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;

import org.axonframework.commandhandling.gateway.CommandGateway;

import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;

import org.axonframework.eventhandling.ClusteringEventBus;

import org.axonframework.eventhandling.DefaultClusterSelector;

import org.axonframework.eventhandling.SimpleCluster;

import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;

import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;

import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;

import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;

import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;

import org.axonframework.eventsourcing.EventSourcingRepository;

import org.axonframework.eventstore.mongo.DefaultMongoTemplate;

import org.axonframework.eventstore.mongo.MongoEventStore;

import org.axonframework.eventstore.mongo.MongoTemplate;

import org.axonframework.serializer.xml.XStreamSerializer;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.support.TaskUtils;

import com.chambers.group.domain.objects.GroupAggregate;

import com.mongodb.MongoClient;

@Configuration

public class AxonConfiguration {

private @Value("${mongo.host}") String mongo;

private @Value("${mongo.port}") int port;

private @Value("${mongo.password}") String password;

private @Value("${mongo.user}") String user;

private @Value("${mongo.db}") String db;

private @Value("${mongo.collection}") String collection;

private @Value("${mongo.snapshot}") String snapshot;

// Rabbit

private @Value("${rabbitmq.host}") String rabbitHost;

private @Value("${rabbitmq.port}") Integer rabbitPort;

private @Value("${rabbitmq.username}") String rabbitUsername;

private @Value("${rabbitmq.password}") String rabbitPassword;

private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;

private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;

private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;

private @Value("${rabbitmq.queue.name}") String rabbitQueueName;

private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;

private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;

private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;

private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;

private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;

private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;

/**

  • RabbitMQ Subsystem Configuration

*/

// Serializer

@Bean

public XStreamSerializer xstreamSerializer() {

return new XStreamSerializer();

}

// Connection Factory

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);

connectionFactory.setUsername(rabbitUsername);

connectionFactory.setPassword(rabbitPassword);

return connectionFactory;

}

// Event bus exchange

@Bean

public FanoutExchange eventBusExchange() {

return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);

}

// Event bus queue

@Bean

public Queue eventBusQueue() {

return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);

}

// Binding

@Bean

public Binding binding() {

return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());

}

// Rabit Admin

@Bean

public RabbitAdmin rabbitAdmin() {

RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

rabbitAdmin.setAutoStartup(true);

return rabbitAdmin;

}

/**

  • AXON

*/

// Command Bus

@Bean

public CommandBus commandBus() {

return new SimpleCommandBus();

}

// Event bus

@Bean

public ClusteringEventBus eventBus() {

ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());

return clusteringEventBus;

}

// Terminal

@Bean

public SpringAMQPTerminal terminal() {

SpringAMQPTerminal terminal = new SpringAMQPTerminal();

terminal.setConnectionFactory(connectionFactory());

terminal.setSerializer(xstreamSerializer());

terminal.setExchangeName(rabbitExchangeName);

terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());

terminal.setDurable(true);

terminal.setTransactional(false);

return terminal;

}

// Configuration

@Bean

SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration() {

SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();

springAMQPConsumerConfiguration.setDefaults(null);

springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);

springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));

springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);

springAMQPConsumerConfiguration.setConcurrentConsumers(1);

springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);

springAMQPConsumerConfiguration.setExclusive(false);

springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);

springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));

springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);

return springAMQPConsumerConfiguration;

}

// Cluster definition

@Bean

SimpleCluster simpleCluster() {

SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);

return simpleCluster;

}

// Message converter

@Bean

DefaultAMQPMessageConverter defaultAMQPMessageConverter() {

return new DefaultAMQPMessageConverter(xstreamSerializer());

}

// Message listener configuration

@Bean

ListenerContainerLifecycleManager listenerContainerLifecycleManager() {

SimpleCluster simpleCluster = simpleCluster();

DefaultAMQPMessageConverter defaultAMQPMessageConverter = defaultAMQPMessageConverter();

ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();

listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());

listenerContainerLifecycleManager.registerCluster(simpleCluster, springAMQPConsumerConfiguration(), defaultAMQPMessageConverter);

return listenerContainerLifecycleManager;

}

// Event listener

@Bean

public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {

AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();

processor.setEventBus(eventBus());

return processor;

}

// Command Handler

@Bean

public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {

AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();

processor.setCommandBus(commandBus());

return processor;

}

// Command Gateway

@Bean

public CommandGatewayFactoryBean commandGatewayFactoryBean() {

CommandGatewayFactoryBean factory = new CommandGatewayFactoryBean();

factory.setCommandBus(commandBus());

return factory;

}

// Event Repository

@Bean

public EventSourcingRepository eventSourcingRepository() {

MongoClient mongoClient = null;

MongoTemplate mongoTemplate = null;

try {

mongoClient = new MongoClient(this.mongo, this.port);

mongoTemplate = new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());

} catch (UnknownHostException e) {

e.printStackTrace();

}

MongoEventStore eventStore = new MongoEventStore(mongoTemplate);

EventSourcingRepository repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);

repository.setEventBus(eventBus());

return repository;

}

@SuppressWarnings(“unchecked”)

@Bean

public AggregateAnnotationCommandHandler groupCommandHandler() {

AggregateAnnotationCommandHandler commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());

return commandHandler;

}

}

`

Hi Antonio,

in your configuration, in the ListenerContainerLifecycleManager config, you’re calling “registerCluster”. Since the terminal will register a cluster, this means your cluster is registered with a ListenerContainer twice. Axon will reuse the same container, causing a message to be downloaded twice. But since the Cluster is subscribed twice, it will receive messages twice.

Removing the call to registerCluster will resolve the issue.
Cheers,

Allard

Hi Allard,

Yes, you are right, after remove this line the EventHandler is capturing the event only once.

Thanks a lot for your help!

Your code must be handling duplicate messages anyway (for rare cases) since RabbitMQ cannot guarantee “exactly once” delivery. I’m interested, what kind of “idempotent consumer” strategy are you using?

https://www.rabbitmq.com/reliability.html#consumer

-Peter