Configure axon framework with spring using java configuration annotations?

Hi,

Looking through the axon trader and address book examples I see that the axon spring configuration is done using xml.
Is it currently possible (or planned) to be able to configure axon framework with spring using java configuration annotations?
If so are there any examples available?

Thanks

With a little playing around I managed to get a simple example to work without requiring any xml configuration for spring

https://gist.github.com/MagnusSmith/8938755

Hi Magnus,

good to hear that you’ve got it working.
I’m not too familiar with the intricate details of Spring java-config, but it’s probably possible to use the AnnotationCommandHandlerBeanPostProcessor (and its event counterpart) to automatically do the “AggregateAnnotationCommandHandler.subscribe” part. In that case, you can use classpath scanning for your command handler and event handler beans.

Cheers,

Allard

Thanks Allard that was the bit I was missing.

https://gist.github.com/MagnusSmith/8938755

Now works perfectly without requiring any xml

Hi,

Here a version of Axon Framework with Distributed Event Bus configured with annotations. I’m posting it for future reference just in case somebody needs it. This example Uses Axon 2.4 and has been tested against RabbitMQ 3.4.2

Thanks to Allard for help me to resolve an issue in this configuration.

Regards

`
package com.chambers.group.configuration;

import java.net.UnknownHostException;

import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;
import org.axonframework.eventhandling.ClusteringEventBus;
import org.axonframework.eventhandling.DefaultClusterSelector;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventBusTerminal;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventstore.mongo.DefaultMongoTemplate;
import org.axonframework.eventstore.mongo.MongoEventStore;
import org.axonframework.eventstore.mongo.MongoTemplate;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.support.TaskUtils;

import com.chambers.group.domain.objects.GroupAggregate;
import com.mongodb.MongoClient;

@Configuration
public class AxonConfiguration {
private @Value("${mongo.host}") String mongo;
private @Value("${mongo.port}") int port;
private @Value("${mongo.password}") String password;
private @Value("${mongo.user}") String user;
private @Value("${mongo.db}") String db;
private @Value("${mongo.collection}") String collection;
private @Value("${mongo.snapshot}") String snapshot;

// Rabbit
private @Value("${rabbitmq.host}") String rabbitHost;
private @Value("${rabbitmq.port}") Integer rabbitPort;
private @Value("${rabbitmq.username}") String rabbitUsername;
private @Value("${rabbitmq.password}") String rabbitPassword;
private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;
private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;
private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;
private @Value("${rabbitmq.queue.name}") String rabbitQueueName;
private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;
private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;
private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;
private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;
private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;
private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;

/**

  • RabbitMQ Subsystem Configuration
    */

// Serializer
@Bean
public XStreamSerializer xstreamSerializer() {
return new XStreamSerializer();
}

// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}

// Event bus exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}

// Event bus queue
@Bean
public Queue eventBusQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}

// Binding
@Bean
public Binding binding() {
return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());
}

// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}

/**

  • AXON
    */

// Command Bus
@Bean
public CommandBus commandBus() {
return new SimpleCommandBus();
}

// Event bus
@Bean
public EventBus eventBus() {
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());
return clusteringEventBus;
}

// Terminal
@Bean
public EventBusTerminal terminal() {
SpringAMQPTerminal terminal = new SpringAMQPTerminal();
terminal.setConnectionFactory(connectionFactory());
terminal.setSerializer(xstreamSerializer());
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}

// Configuration
@Bean
AMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}

// Cluster definition
@Bean
SimpleCluster simpleCluster() {
SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);
return simpleCluster;
}

// Message converter
@Bean
DefaultAMQPMessageConverter defaultAMQPMessageConverter() {
return new DefaultAMQPMessageConverter(xstreamSerializer());
}

// Message listener configuration
@Bean
ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
return listenerContainerLifecycleManager;
}

// Event listener
@Bean
public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {
AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();
processor.setEventBus(eventBus());
return processor;
}

// Command Handler
@Bean
public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {
AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();
processor.setCommandBus(commandBus());
return processor;
}

// Command Gateway
@Bean
public CommandGatewayFactoryBean commandGatewayFactoryBean() {
CommandGatewayFactoryBean factory = new CommandGatewayFactoryBean();
factory.setCommandBus(commandBus());
return factory;
}

// Event Repository
@Bean
public EventSourcingRepository eventSourcingRepository() {
MongoClient mongoClient = null;

MongoTemplate mongoTemplate = null;
try {
mongoClient = new MongoClient(this.mongo, this.port);
mongoTemplate = new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());
} catch (UnknownHostException e) {
e.printStackTrace();
}

MongoEventStore eventStore = new MongoEventStore(mongoTemplate);
EventSourcingRepository repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);
repository.setEventBus(eventBus());
return repository;
}

@SuppressWarnings(“unchecked”)
@Bean
public AggregateAnnotationCommandHandler groupCommandHandler() {
AggregateAnnotationCommandHandler commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());
return commandHandler;
}
}

`

Hi,

thanks a lot for sharing your java config.
i had to add the line to register the cluster to the listenerContainer to get the consuming part going.
so my bean definition for the listenerContainerLifecycleManager is like this:

@Bean
public ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
   ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
   listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
   listenerContainerLifecycleManager.registerCluster(simpleCluster(), springAMQPConsumerConfiguration(), defaultAMQPMessageConverter() );
   return listenerContainerLifecycleManager;
}

Cheers
Job

Hi,

is there any example for Axon 3.0.x with AMQP?

Thanks,
Szabolcs

Hi,

You could check the webinar of Bootiful Axon and have a look at the code created in it.
It might be enough to help you out!

Cheers,

Frank

Hi,

thanks for the quick answer. Based on your example I have created 2 modules(2 separate JVMs) which are communicating with eachother using commands and events via amqp. I run into another problem through. I created a Saga with the @Saga annotation, but its @SagaEventhandlers are not triggered by the events fired from the other module. If I trigger an event from the same module, the method with the @StartSaga annotation gets called, so it works fine. I tried to declare some event handler in other classes with the @Component and @Eventhandler annotations and they respond to the event from the other module as well.

Do I have to configure something to make the Saga-s receive the events from other modules as well?

Regards,
Szabolcs

`

@Saga
@ProcessingGroup(“admin”)
public class MySaga {

private static final Logger LOGGER = Logger.getLogger(MySaga.class);
@Autowired
private transient CommandBus commandBus;

@EventHandler
public void handle(SomeEventFromOtherModule event) {
LOGGER.info(">>> SomeEventFromOtherModule received");
}

@StartSaga
@SagaEventHandler(associationProperty = “eventId”)
public void on(SomeEventFromThisModile event) {
otherEventId = event.otherId();

SagaLifecycle.associateWith(“otherEventId”, event.otherId());

commandBus.dispatch(GenericCommandMessage.asCommandMessage(commandToTheOtherModule));
}

@SagaEventHandler(associationProperty = “otherEventId”)
public void on(SomeEventFromOtherModule event) {
LOGGER.info(">>> SomeEventFromOtherModule received by SagaEventHandler");
}
}

`

Meanwhile I found the following thread: https://groups.google.com/forum/#!topic/axonframework/g6yvbTZquh8