Axon 3: ClusteringEventBus

Hi Guys,

The docs still makes reference to ClusteringEventBus, but i can’t seem to find this class in Axon3. Has it been replaced with something else?

Thanks,
–KD

Does anyone have an example of wiring up RabbitMQ to handle distributed transactions for Axon3?

Here’s some info on what i have so far.

I think i got the basics config going.

My commands are firing correctly, my local events on the service are handled correctly. The events are persisted in the store.
I just can’t seem to get the events flowing through to rabbit.

Can someone point me in the right direction? Seem like i’m almost there… just missing one or two links…
Most of the config i grabbed from this forum. The rest i found samples on github.

I’d really appreciate if someone could point my in the right direction.

Here with my config (Its a bit split up as the configs come from different projects):

`
@Configuration
@EntityScan({
“com.mycomp.jpas”
})
public class AxonEventStorage {
@PersistenceUnit
private EntityManagerFactory entityManagerFactory;

@Bean
public PlatformTransactionManager transactionManager() {
return new JpaTransactionManager(entityManagerFactory);
}

@Bean
public SpringTransactionManager springTransactionManager(PlatformTransactionManager transactionManager) {
return new SpringTransactionManager(transactionManager);
}

@Bean
public TransactionManagerFactoryBean transactionManagerFactoryBean(PlatformTransactionManager transactionManager) throws PropertyVetoException {
TransactionManagerFactoryBean factoryBean = new TransactionManagerFactoryBean();
factoryBean.setTransactionManager(transactionManager);
return factoryBean;
}

@Bean
public EntityManagerProvider entityManagerProvider() {
return new ContainerManagedEntityManagerProvider();
}

@Bean
public EventStore eventStore(EntityManagerProvider entityManagerProvider, DataSource dataSource, SpringTransactionManager springTransactionManager, Serializer serializer) throws SQLException {
EventStorageEngine eventStorageEngine = new PostgresEventStorage(serializer, null, dataSource, springTransactionManager, entityManagerProvider);
return new EmbeddedEventStore(eventStorageEngine);
}

@Bean
public SagaStore postgresJPASagaStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
return new PostgresJsonSagaStore(serializer, entityManagerProvider);
}
}

`

`
@Configuration
public class AxonGeneral {
/**

  • @return We’re using the JSON serializer
    */
    @Bean
    public Serializer serializer() {
    return new JacksonSerializer();
    }
    }
    `

`
@Configuration
/**

  • RabbitMQ Spring Configuration
    */
    public class RabbitMQConfig {
    @Value("${rabbitmq.host}")
    private String rabbitHost;

@Value("${rabbitmq.port}")
private Integer rabbitPort;

@Value("${rabbitmq.username}")
private String rabbitUsername;

@Value("${rabbitmq.password}")
private String rabbitPassword;

@Value("${rabbitmq.exchange.name}")
private String rabbitExchangeName;

@Value("${rabbitmq.exchange.autodelete}")
private boolean rabbitExchangeAutodelete;

@Value("${rabbitmq.exchange.durable}")
private boolean rabbitExchangeDurable;

@Value("${rabbitmq.queue.name}")
private String rabbitQueueName;

@Value("${rabbitmq.queue.durable}")
private Boolean rabbitQueueDurable;

@Value("${rabbitmq.queue.exclusive}")
private Boolean rabbitQueueExclusive;

@Value("${rabbitmq.queue.autodelete}")
private Boolean rabbitQueueAutoDelete;

@Value("${rabbitmq.queue-listener.prefetch-count}")
private Integer rabbitQueueListenerPrefetchCount;

@Value("${rabbitmq.queue-listener.recovery-interval}")
private Long rabbitQueueListenerRecoveryInterval;

@Value("${rabbitmq.queue-listener.cluster-transaction-size}")
private Integer rabbitQueueClusterTransactionSize;

// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}

// Fanout Exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}

// Rabbit queue
@Bean
public Queue rabbitQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}

// AMPQ Binding
@Bean
public Binding ampqBinding(Queue eventBusQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(eventBusQueue).to(fanoutExchange);
}

// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}

// Spring AMPQ Terminal
@Bean
public SpringAMQPTerminal terminal(EventBus eventBus, Serializer serializer, ConnectionFactory connectionFactory, ListenerContainerLifecycleManager listenerContainerLifecycleManager) {
SpringAMQPTerminal terminal = new SpringAMQPTerminal(eventBus);
terminal.setConnectionFactory(connectionFactory);
terminal.setSerializer(serializer);
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager);
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}

// Axon AMQPConsumerConfiguration
@Bean
public AMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}

// Axon AMPQ Message converter
@Bean
public DefaultAMQPMessageConverter defaultAMQPMessageConverter(Serializer serializer) {
return new DefaultAMQPMessageConverter(serializer);
}

// Axon Rabbit(container) Life cycle manager
@Bean
public ListenerContainerLifecycleManager listenerContainerLifecycleManager(ConnectionFactory connectionFactory, EventProcessor eventProcessor, AMQPConsumerConfiguration aMQPConsumerConfiguration, DefaultAMQPMessageConverter defaultAMQPMessageConverter) {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory);
// listenerContainerLifecycleManager.registerEventProcessor(eventProcessor, aMQPConsumerConfiguration, defaultAMQPMessageConverter)
return listenerContainerLifecycleManager;
}
}

`

`
@Configuration
public class AxonCommandHandling {

@Bean
public AnnotationCommandHandlerAdapter annotationCredApplicationCommandHandler(CommandBus commandBus, MyHandler myHandler) {
AnnotationCommandHandlerAdapter annotationCommandHandlerAdapter = new AnnotationCommandHandlerAdapter(myHandler);
annotationCommandHandlerAdapter.subscribe(commandBus);

return annotationCommandHandlerAdapter;
}

@Bean
public CommandBus simpleCommandBus() {
SimpleCommandBus simpleCommandBus = new SimpleCommandBus();
simpleCommandBus.setDispatchInterceptors(Arrays.asList(new BeanValidationInterceptor<>()));
return simpleCommandBus;
}

}

`

`
@Configuration
public class AxonEventHandling {
@Bean
public EventProcessor eventProcessor(EventBus eventBus, MyListener myListener) {
EventProcessor eventProcessor = new SubscribingEventProcessor(“eventProcessor”,
new SimpleEventHandlerInvoker(
myListener),
eventBus);
eventProcessor.start();
return eventProcessor;
}

@Bean
public EventProcessorMessageListener eventProcessorMessageListener(EventProcessor eventProcessor,DefaultAMQPMessageConverter ampqMessageConverter) {
EventProcessorMessageListener messageListener = new EventProcessorMessageListener(ampqMessageConverter);
return messageListener;
}

}

`

Hi,

the missing link might be that the SpringAMQPTerminal needs to be start()-ed.

Note that there is some redesigning and tuning left for the AMQP components in Axon. You may have to change your configuration in the future, but I promise it will become easier :wink:

Cheers,

Allard

@Allard

Awesome! I have data on the rabbit queue on the command side now…!

Final problem:
The service that consumes off the rabbit queue doesn’t seem to be ‘listening’ or processing those commands…
Looking at my ‘AxonEventHandling’ class, is there anything i should be doing…?

I see the ListenerContainerLifecycleManager class has the ability to register an event processor.

I defined one: ‘EventProcessorMessageListener’
How do i plug that in? I’m assuming that is what i am not doing…?

I’m not sure how i transition from ‘EventProcessorMessageListener’ to ‘Consumer<List<? extends EventMessage<?>>> eventProcessor’ when i create the ListenerContainerLifecycleManager bean :frowning:
Thanks for the feedback!

@Allard?

Any advice on how to resolve the query side problem?

Hi Kirk,

as I said, the AMQP API for Axon 3 isn’t finished yet, but you should be able to get it working this way:

create a subclass of SubscribingEventProcessor, and override the start method.

In the start method, register it with the AMQP Terminal (so not with the event bus) using:
amqpTerminal.subscribe(subscribingEventProcessor::process);

We’re planning to create an AMQPEventProcessor in the very near future (only 1 feature has higher priority), that can be registered as a message listener to a MessageListenerContainer.

Cheers,

Allard

Cool, i’ll give it a go. Thanks for the hint!

Thanks Allard, that did the trick.

Side note: I had to overide the subscribe method behaviour… because it was picking up the incorrect queue name.
Other than that, super happy. :slight_smile:

Hi Allard Buijze, today I found that axon-3.0-RC1 released in maven and want to migrate on it, but this workaround not work yet SpringAMQPSubscriber not have method subscribe any more.

Can you tell me how can I configurate amqp for event processor now.

Thanks.

Hi Aleh,

you need to configure the SpringAMQPMessageSource, which implements the MessageListener (ChannelAwareMessageListener to be exact). That means you can register the SpringAMQPMessageSource as a listener in your SimpleMessageListenerContainer.

Another trick, which works nicely in Spring 4, is to return an anonymous subclass of the SpringAMQPMessageSource and annotate an overridden version of the onMessage method, like so:

@Bean
public SpringAMQPMessageSource amqpMessageSource() {
return new SpringAMQPMessageSource() {
@RabbitListener(queues = “queueNameHere”)
@Override
pubic void onMessage(Message message, Channel channel) {
super.onMessage(message, channel);
}
}
}
The @RabbitListener will trigger Spring to create a SimpleMessageListenerContainer that reads from the specified queue and invokes the onMessage method, from where Axon takes over. Then you can register this amqpMessageSource bean as the source for your subscribing processor.

Hope this helps.
Cheers,

Allard

I am very new to Axon and attempting to implement the RabbitMQ AMQP Bus with Axon 3. I just joined the User Group and the above example is extremely helpful. Would you be able to show me how to create the Publisher side as well?

Thanks for the help!
-Matt

Hi Matthew,

the documentation is still in progress, so I can’t point you there.

What you need to do is configure a SpringAMQPPublisher. It will attempt to retrieve the necessary components from the application context, so it will work without any explicit configuration.
The default exchange name is sends to is “Axon.EventBus”. You probably want to configure your own exchange name.

Hope this helps.
Cheers,

Allard

Awesome thanks Allard I’ll try that out!

Regards,
Matt

Hi Aleh,

this sounds like an AMQP misconfiguration. Your SpringAmqpMessageSource instances are probably configured to listen to the same queue. Instead, you should define a separate queue for each SpringAMQPMessageSource instance.

Cheers,

Allard