Distributed command bus

Hi ,

I am using Axon Framework and trying some example using distributed command bus.
I have 2 Aggregates 1>ToDoItem 2>AnotherToDoItem
ToDoItem aggregate have two commands 1>CreateToDoItemCommand 2>MarkCompletedCommand
CaseAggregate have one command 1>CreateAnotherToDoItemCommand
When I run this code getting following result:

Please enter the load factor to join with: [100]
1
Using a load factor 1 when connecting to the cluster.
GossipRouter started at Wed Apr 30 18:11:43 IST 2014
Listening on port 12001 bound on address 0.0.0.0/0.0.0.0
Backlog is 1000, linger timeout is 2000, and read timeout is 0
35 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@64482923: startup date [Wed Apr 30 18:11:44 IST 2014]; root of context hierarchy
72 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [distributed-config.xml]
406 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.contextsupport.spring.SpringContextParameterResolverFactoryBuilder$ClasspathParameterResolverFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
410 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.common.annotation.ClasspathParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
411 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)#1’ of type [class org.axonframework.common.annotation.SpringBeanParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
429 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.contextsupport.spring.ApplicationContextLookupParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
429 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.common.annotation.MultiParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
448 [main] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@28787c16: defining beans [org.springframework.context.support.PropertySourcesPlaceholderConfigurer#0,org.springframework.context.annotation.internalAsyncAnnotationProcessor,org.springframework.context.annotation.internalScheduledAnnotationProcessor,taskExecutor,commandBus,commandGateway,eventStore,eventBus,toDoRepository,__axon-parameter-resolver-factory,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#0,caseRepository,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#1,jGroupsConnector,jGroupsConnector1,__axon-annotation-command-handler-bean-post-processor,__axon-annotation-event-listener-bean-post-processor,org.axonframework.quickstart.annotated.ToDoEventHandler#0,org.axonframework.quickstart.annotated.CaseActiveEventHandler#0,org.axonframework.serializer.xml.XStreamSerializer#0]; root of factory hierarchy
453 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService ‘taskExecutor’
Apr 30, 2014 6:11:44 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
Apr 30, 2014 6:11:44 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
927 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 0
930 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647

Hi,

I see you’re choosing 1 as the number of segments. That’s very low and could be the cause of an unbalance. 100 is a good default value. Do all the commands have an @TargetAggregateIdentifier annnotation on the field (or getter) the contains the aggregate identifier?

Cheers,

Allard

yes all the commands have an @TargetAggregateIdentifier annnotation on the fields. Now M using 100 as a load factor but getting the same response.

I want output to be CreateToDoItemCommand and MarkCompletedCommand to be distributed on one segment and CreateAnotherToDoItemCommand on another segment .
How could i achieve this using distributed command bus?

The only way to achieve that, is to make sure that on one node, you only register command handlers for CreateToDoItemCommand and MarkCompletedCommand, and on the other CreateAnotherToDoItemCommand . Axon will distribute load to all servers accepting a specific type of command. If multiple nodes accept the same command type, the distribution is based on the aggregate identifier.

Cheers,

Allard

Application context

`

<context:property-placeholder />

<task:annotation-driven />









<axon:command-bus id=“commandBus1”/>

<axon:filesystem-event-store id=“eventStore”
base-dir=“events” />

<axon:event-bus id=“eventBus” />

<axon:event-sourcing-repository id=“toDoRepository”
aggregate-type=“org.axonframework.quickstart.annotated.ToDoItem” />

<axon:aggregate-command-handler
aggregate-type=“org.axonframework.quickstart.annotated.ToDoItem”
repository=“toDoRepository” command-bus=“commandBus1” />

<axon:command-bus id=“commandBus2”/>

<axon:event-sourcing-repository id=“anotherToDoItemRepository”
aggregate-type=“org.axonframework.quickstart.annotated.AnotherToDoItem” />

<axon:aggregate-command-handler
aggregate-type=“org.axonframework.quickstart.annotated.AnotherToDoItem”
repository=“anotherToDoItemRepository” command-bus=“commandBus2” />











<axon:annotation-config />

` when i run my application getting following exception

`
Please enter the load factor to join with: [100]
100
Using a load factor 100 when connecting to the cluster.
GossipRouter started at Fri May 02 14:42:36 IST 2014
Listening on port 12001 bound on address 0.0.0.0/0.0.0.0
Backlog is 1000, linger timeout is 2000, and read timeout is 0
34 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@64482923: startup date [Fri May 02 14:42:36 IST 2014]; root of context hierarchy
71 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [distributed-config.xml]
409 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.contextsupport.spring.SpringContextParameterResolverFactoryBuilder$ClasspathParameterResolverFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
413 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.common.annotation.ClasspathParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
414 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)#1’ of type [class org.axonframework.common.annotation.SpringBeanParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
432 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.contextsupport.spring.ApplicationContextLookupParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
432 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.common.annotation.MultiParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
451 [main] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@540984b: defining beans [org.springframework.context.support.PropertySourcesPlaceholderConfigurer#0,org.springframework.context.annotation.internalAsyncAnnotationProcessor,org.springframework.context.annotation.internalScheduledAnnotationProcessor,taskExecutor,commandBus,commandGateway,commandBus1,eventStore,eventBus,toDoRepository,__axon-parameter-resolver-factory,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#0,commandBus2,caseRepository,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#1,jGroupsConnector,jGroupsConnector1,__axon-annotation-command-handler-bean-post-processor,__axon-annotation-event-listener-bean-post-processor,org.axonframework.quickstart.annotated.ToDoEventHandler#0,org.axonframework.quickstart.annotated.CaseActiveEventHandler#0,org.axonframework.serializer.xml.XStreamSerializer#0]; root of factory hierarchy
456 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService ‘taskExecutor’
May 2, 2014 2:42:36 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
May 2, 2014 2:42:36 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
939 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 0
942 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647

apparently, there is no CommandHandler for org.axonframework.quickstart.api.CreateAnotherToDoItemCommand on any of the nodes.
You’ve got 3 command buses configured. “commandBus” is the distributed one, with “commandBus1” as local segment. However, the commandHandler registering CreateAnotherToDoItemCommand is registered to commandBus2, which is not part of the distributed command bus. Thus, commands of that type sent to “commandBus” cannot be handled.

Try creating two instances of DistributedCommandBus, each with a different connector. If the connectors use the same clusterName, they will find eachother.

Cheers,

Allard

Thanks Allard.

Now i have created





Thanks Allard.

I have created two instances of Distributed command bus and its working fine but now events are not getting published.

`





`

Output :

`
Please enter the load factor to join with: [100]
100
Using a load factor 100 when connecting to the cluster.
GossipRouter started at Fri May 02 16:05:09 IST 2014
Listening on port 12001 bound on address 0.0.0.0/0.0.0.0
Backlog is 1000, linger timeout is 2000, and read timeout is 0
34 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@64482923: startup date [Fri May 02 16:05:09 IST 2014]; root of context hierarchy
71 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [distributed-config.xml]
407 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.contextsupport.spring.SpringContextParameterResolverFactoryBuilder$ClasspathParameterResolverFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
411 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)’ of type [class org.axonframework.common.annotation.ClasspathParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
412 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘(inner bean)#1’ of type [class org.axonframework.common.annotation.SpringBeanParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
430 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.contextsupport.spring.ApplicationContextLookupParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
430 [main] INFO org.springframework.context.support.ClassPathXmlApplicationContext - Bean ‘__axon-parameter-resolver-factory’ of type [class org.axonframework.common.annotation.MultiParameterResolverFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
449 [main] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@13b33a0e: defining beans [org.springframework.context.support.PropertySourcesPlaceholderConfigurer#0,org.springframework.context.annotation.internalAsyncAnnotationProcessor,org.springframework.context.annotation.internalScheduledAnnotationProcessor,taskExecutor,commandBus,commandBus1,commandGateway,eventStore,eventBus,toDoRepository,__axon-parameter-resolver-factory,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#0,caseRepository,org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandlerFactoryBean#1,jGroupsConnector,jGroupsConnector1,__axon-annotation-command-handler-bean-post-processor,__axon-annotation-event-listener-bean-post-processor,org.axonframework.serializer.xml.XStreamSerializer#0]; root of factory hierarchy
454 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService ‘taskExecutor’
May 2, 2014 4:05:10 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
May 2, 2014 4:05:10 PM org.jgroups.logging.JDKLogImpl info
INFO: UFC is not needed (and can be removed) as we’re running on a TCP transport
859 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 0
859 [main] INFO org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647

In your configuration, it seems that you register the command handlers (the aggregate-command-handler) with the command bus instances that you use as local segment. You should register them with the appropriate distributed command bus instance. In fact, if the local segment is just a default command bus, you don’t even need to configure it at all.

Check out the QuickStart samples to see how the wiring is done:
https://github.com/AxonFramework/AxonFramework/blob/master/quickstart/src/main/java/org/axonframework/quickstart/RunDistributedCommandBusWithSpring.java

and
https://github.com/AxonFramework/AxonFramework/blob/master/quickstart/src/main/resources/distributed-config.xml

Cheers,

Allard

My commands are getting published on different segments now but events are not getting published…

`

<context:property-placeholder />

<task:annotation-driven />











<axon:filesystem-event-store id=“eventStore”
base-dir=“events” />

<axon:event-bus id=“eventBus” />

<axon:event-sourcing-repository id=“toDoRepository”
aggregate-type=“org.axonframework.quickstart.annotated.ToDoItem” />

<axon:aggregate-command-handler
aggregate-type=“org.axonframework.quickstart.annotated.ToDoItem”
repository=“toDoRepository” command-bus=“commandBus” />

<axon:event-sourcing-repository id=“caseRepository”
aggregate-type=“org.axonframework.quickstart.annotated.AnotherToDoItem” />

<axon:aggregate-command-handler
aggregate-type=“org.axonframework.quickstart.annotated.AnotherToDoItem”
repository=“caseRepository” command-bus=“commandBus1” />







<axon:annotation-config />

`

DId you have the EventHandler beans commented out when you ran the test?

yes it was my mistake… :slight_smile: now my code is working as expected…

Thanks Allard for your inputs.