Async Event handlers

Is there any example of a POJO component with

async event handlers with the Asynchronous Cluster in Spring

for spring config and pojo annotation.

Thanks and Cheers…

Used example from another thread trying to get an asyc cluster to process @EventHandler of a class asynchronously
and seeing weird behavior with many events which are supposed to be sync on aggregate and only one EventHandler component to be async.

Created following Java component

@Component

@Named(“xxxFactory”)

public class AxonAsyncClusterFactory {

@Bean(name = “xxxAxonCluster”)

public Cluster createAxonCluster() {

ThreadPoolExecutor clusterExecutor = new ThreadPoolExecutor(2, 10, 60L, TimeUnit.MINUTES,

new SynchronousQueue(true));

return new AsynchronousCluster(“xxxAxonCluster”, clusterExecutor, new NoTransactionManager(),

new SequentialPerAggregatePolicy());

}

}

Then in the Spring XML:

<axon:cluster id=“defaultAxonCluster” default=“true”>

axon:selectors

<axon:class-name-matches pattern =“com.fp.server.flow.base.async.FlowMailEventHandlerAndSender”/>

</axon:selectors>

</axon:cluster>

And FlowMailEventHandlerAndSender has @EventHandler method for async email sending. However

when default=“true” I am seeing all other events also being processed Asynchronously for classes which are not in the selector?? Not sure why and app is becoming corrupt and not working.

If I omit default, then seeing errors from startup i.e. cannot start the app… load errors

[WARN] Nested in org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘actionReminderMailEventHandlerAndSender’ defined in URL [jar:file:/fp/fp-core.jar!/com/fp/server/action/ActionReminderMailEventHandlerAndSender.class]: Initialization of bean failed; nested exception is org.axonframework.eventhandling.EventListenerSubscriptionFailedException: Unable to subscribe [com.fp.server.action.ActionReminderMailEventHandlerAndSender] to the Event Bus. There is no suitable cluster for it. Make sure the ClusterSelector is configured properly:

org.axonframework.eventhandling.EventListenerSubscriptionFailedException: Unable to subscribe [com.fp.server.action.ActionReminderMailEventHandlerAndSender] to the Event Bus. There is no suitable cluster for it. Make sure the ClusterSelector is configured properly

at org.axonframework.eventhandling.ClusteringEventBus.clusterFor(ClusteringEventBus.java:111)

at org.axonframework.eventhandling.ClusteringEventBus.subscribe(ClusteringEventBus.java:96)

at org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter.subscribe(AnnotationEventListenerAdapter.java:99)

at org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter.subscribe(AnnotationEventListenerAdapter.java:52)

at org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor.initializeAdapterFor(AnnotationEventListenerBeanPostProcessor.java:55)

at org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor.initializeAdapterFor(AnnotationEventListenerBeanPostProcessor.java:36)

at org.axonframework.common.annotation.AbstractAnnotationHandlerBeanPostProcessor.postProcessAfterInitialization(AbstractAnnotationHandlerBeanPostProcessor.java:71)

at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:407)

at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1461)

at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:519)

at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:456)

at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:294)

at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:225)

at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:291)

at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193)

at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:585)

at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:913)

at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:464)

at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:385)

at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:284)

at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:111)

made more progress by declaring another cluster as a blank with default true. Not sure if it is correct way but but more iterative trying…

<axon:cluster id=“mainCluster” default=“true”/> and removed default from the async one.

.Now started seeing async event be normal, but I started loosing all my database inserts i.e. nothing is commiting. In this case the asyn event sends email and fires a command to an aggregate which does aggregate processing and eventually sends event to updater which updates reporting database. No errors in log, but no data in db. Any suggestions, I could not wire the main jpaTransactionManager to the asyncCluster as they seem to be having different data types… so finally landed on SpringTransactionManager and configured that in my axon infrastructure and injected into my ClusterFactory component and things started saving again… so here is the final modification. Please let me know if this is correct way.

Added to spring xml the ref below is the master transaction Manager used by everything.

Modified factory

@Component

@Named(“xxxFactory”)

public class AxonAsyncClusterFactory {

@Autowired

@Named(“MySpringTxManager”)

SpringTransactionManager transactionManager;

public Cluster createAxonCluster() {

ThreadPoolExecutor clusterExecutor = new ThreadPoolExecutor(2, 10, 60L, TimeUnit.MINUTES,

new SynchronousQueue(true),new NamedThreadFactory(“EventPublishing”, true));

return new AsynchronousCluster(“xxxAxonCluster”, clusterExecutor, transactionManager,

new SequentialPerAggregatePolicy());

}

}

Please let me know if this is correct way or any other better way to do it.

Thanks and cheers…

Hi,

your approach looks like the right way to do it. In Axon 2, you need to organize event handlers in “clusters”. Each cluster has some infrastructural behavior configured to it. By specifying “selectors”, you specify how each handler is assigned to a cluster. One cluster may be a default, which means it is selected when no selector explicitly points one out.

Cheers,

Allard