Async cluster not working as expected on a Clustering Event bus setup

Hi,
We have configured our axonframework to use clustering event bus. We have two async clusters and one synchronous default cluster. The async clusters are configured with a FullConcurrencyPolicy and a ThreadPoolExecutorFactoryBean. The synchronous cluster is working fine, however the async cluster is processing one thing at a time and not processing asynchronously. We have tried different configurations and things got worse, in that we saw the async cluster process the first event and stops processing things after words.

Here is our configuration. Please let me know what we are doing wrong.

<!-- Axon Clusters -->
<!-- Create servicing async cluster -->
<axon:cluster id="asyncServicing">
  <!-- the inner bean defines the actual cluster implementation to use. Here, it is an asynchronous cluster -->
  <bean class="org.axonframework.eventhandling.async.AsynchronousCluster">
    <constructor-arg value="asyncServicing"/>
    <constructor-arg ref="asyncExecutor"/>
    <constructor-arg>
      <bean class="org.axonframework.eventhandling.async.FullConcurrencyPolicy"/>
    </constructor-arg>
  </bean>
  <!-- Define servicing package we want to be handled by this cluster -->
  <axon:selectors>
    <axon:package prefix="com.xyz.servicing"/>
  </axon:selectors>
</axon:cluster>

<!-- Create event store async cluster -->
<axon:cluster id="asyncEventStore">
  <bean class="org.axonframework.eventhandling.async.AsynchronousCluster">
    <constructor-arg value="asyncEventStore"/>
    <constructor-arg ref="asyncExecutor"/>
    <constructor-arg>
      <bean class="org.axonframework.eventhandling.async.FullConcurrencyPolicy"/>
    </constructor-arg>
  </bean>
  <!-- Define event store package we want to be handled by this cluster -->
  <axon:selectors>
    <axon:package prefix="com.xyz.eventstore"/>
  </axon:selectors>
</axon:cluster>

<!-- We also create a simple cluster, and we define it as default, meaning it will be selected when no other
       selectors (or clusters) match -->
<axon:cluster id="simple" default="true"/>

<!-- We need a thread pool to execute tasks -->
<bean id="asyncExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
  <property name="corePoolSize" value="1"/>
  <property name="waitForTasksToCompleteOnShutdown" value="true"/>
</bean>

Thanks,
MZ

Can somebody please help?

Hi,

sorry for the delay. Some unexpected events required my full attention for the last few days.
I don’t have the code here right now to verify, but I believe you need to specify the number of threads to use in the async cluster. It defaults to 1.
I think there is also namespace support for the async cluster. Inside the cluster element, use to make the cluster asynchronous.

Hope that help.
Cheers,

Allard

Allard,
Thank you for your reply. I have looked into what you suggested but I could not find what you are talking about. Can you please give me some more detail?
What I have found is if the cluster is processing a long running thing the cluster doesn’t process anything else until the long running process is done or fails. Is that the right behavior?

MZ

Hi,

I see the issue now. You executor is a pool of 1 thread. If you want concurrent execution, it needs at least 2 threads. When you have long running processes that you don’t want to block others, you should probably have more.

Note that Java’s thread pools only create new threads when the queue is full. By default, the queue in unbounded, meaning that your core thread count is the maximum number of threads.

Cheers,

Allard

Allard,

I have tried increasing the corePoolSize to something other than 1 and the minute I do that the culster stops processing events. Do you think that has something to do with my step up or a missing configuration?

Thanks,
MZ

Hi Mouad,

at first glance, your configuration seems fine and I have tests in Axon to prove that the Async Cluster will process events normally.

You say it stops processing events. Does it hang? Do events get discarded? One thing you can do to debug the situation is to create two thread dumps one a few seconds after the other. You should be able to see whether processing hangs. Also make sure the event handlers in the com.xyz.eventstore package are thread safe.

If you have some example code you can share to show the issue, it would make it much easier to help.

Cheers,

Allard

Allard,
I did some more testing and looked into the thread dumps and here is what I have so far:

  • If corePoolSize in the ThreadPoolExecutorFactoryBean is greater than 1, then the async event cluster is on wait state and it doesn’t get the event
  • If corePoolSize in the ThreadPoolExecutorFactoryBean is 1, then events are posted by the async event cluster appropriately
  • If corePoolSize and maxPoolSize are both set to a value greater than 1, it got even weirder in this setup where if you fire two events of the same type, only one of them is processed the other never comes thru.

Is there a way to turn debugging on for the axonframework to see what’s going on?

Thanks,
Mouad Z.

Allard,
Any updates?

Thanks,
Mouad Z.

Hi Mouad,

honestly, I have no clue what’s going on. The behavior you’re describing makes very little sense to me. Also, I am not able to reproduce any of it myself.
What Axon does, when using the full concurrency policy, is just schedule tasks for the handling of a single event. There is nothing fancy going on. The fact that the processing of some events works or not depends on the number of threads in the pool, is just weird.

Unless you can provide me with a little runnable code sample that reproduces the problem, I’m afraid there is not much I can do. Alternatively, you could provide me with a chronon recording.

Cheers,

Allard

Hi Allard,
Once I put a high priority on the async executor then the async cluster processes events approrpiately however the saga doesn’t get processed. Any ideas?

I will work on getting you a chronon recording.

THanks,
Mouad Z.

Can you share your saga configuration?
Note that Sagas are only processed when an association value matches for a specific Saga instance, or when the Event Handler method on the Saga is annotated with @StartSaga.

Allard,
Prior to updating the priority the saga was working. We are using the association value for the saga to start processing.
Here is the saga configuration:

<axon:saga-manager id="sagaManager" event-bus="eventBus" saga-repository="sagaRepository">
  <axon:types>
    com.xyz.core.command.domain.OnboardAutoActivatedProductsSaga
    com.xyz.servicing.retry.ProfileUpdateRetrySaga,
    com.xyz.servicing.retry.AccountDeactivationRetrySaga
  </axon:types>
</axon:saga-manager>

<bean id="axonMongoSagaTemplate"
      class="org.axonframework.saga.repository.mongo.DefaultMongoTemplate">
  <constructor-arg index="0" ref="mongo"/>
  <constructor-arg index="1" value="xyz"/>
  <constructor-arg index="2" value="sagas"/>
  <constructor-arg index="3">
    <null/>
  </constructor-arg>
  <constructor-arg index="4">
    <null/>
  </constructor-arg>
</bean>

<bean id="sagaRepository" class="org.axonframework.saga.repository.mongo.MongoSagaRepository">
  <constructor-arg ref="axonMongoSagaTemplate"/>
  <property name="resourceInjector">
    <bean class="org.axonframework.saga.spring.SpringResourceInjector"/>
  </property>
</bean>

This sounds really weird.
If you use full concurrency policy, it is possible that events overtake eachother. Therefore, the event that would update a saga appears before the saga was created, causing the event to be ignored.

You can also make SagaManagers asynchronous and add them to a normal cluster. That allows the saga manager to decide how to process incoming events in such a way, that invocation is guaranteed in these situations.

Looking forward to the chronon recording.
Cheers,

Allard

Allard,

After a few trial and error, we found the issue. It turns out that we had to split our saga manager configuration into two saga manager configurations. One that has synchronous sagas and one that has asynchronous sagas. That did the trick.

Thank you very much for your help and support

Thanks,
Mouad Z.