Deadlocks with asynchronous event bus

Hey all,
We are using Axon asynchronous event bus and we are experiencing some problems with dead locks. We updated from 2.0.2 to latest release - 2.0.5 but the problem still exists. Axon is able to lock threads from internal thread pool used by asynchronous event bus:

“axon-1” Id=366 TIMED_WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@1067add3 owned by “axon-0” Id=365
at sun.misc.Unsafe.park(Native Method)

  • waiting on java.util.concurrent.locks.ReentrantLock$NonfairSync@1067add3
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:929)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1245)
    at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:445)
    at org.axonframework.common.lock.IdentifierBasedLock$DisposableLock.lock(IdentifierBasedLock.java:165)
    at org.axonframework.common.lock.IdentifierBasedLock$DisposableLock.access$100(IdentifierBasedLock.java:138)
    at org.axonframework.common.lock.IdentifierBasedLock.obtainLock(IdentifierBasedLock.java:102)

“axon-0” Id=365 RUNNABLE
at java.lang.Class.getClassLoader0(Native Method)
at java.lang.Class.getClassLoader(Class.java:609)
at org.osgi.framework.FrameworkUtil$1.run(FrameworkUtil.java:210)
at java.security.AccessController.doPrivileged(Native Method)
at org.osgi.framework.FrameworkUtil.getBundle(FrameworkUtil.java:208)
at com.axonframework.osgi.BundleRevisionResolver.revisionOf(BundleRevisionResolver.java:17) << yup, it’s our own revision provider class
at org.axonframework.serializer.JavaSerializer.revisionOf(JavaSerializer.java:109)
at org.axonframework.serializer.JavaSerializer.typeForClass(JavaSerializer.java:100)

Due this issue we had ~250 jetty threads locked. We use event bus as a central eventing hub so we publish data from couple of places - from scheduler, jms receivers and also web service calls done over HTTP. What is purpose of this IdentifierBasedLock?

Cheers,
Lukasz

Here is also a graph of threads locked: http://grab.by/r2mw (be careful it’s wide). All of them were locked by IdentifierBasedLock.

Lukasz

W dniu piątek, 11 października 2013 17:00:16 UTC+2 użytkownik Łukasz Dywicki napisał:

Hi Lukasz,

deadlocks typically occur because different threads are acquiring the same locks in a different order. If you use an asynchronous event bus with a simple commandbus, this is very likely to happen. If I recall correctly, the Async Event Bus will use a thread per event (pooled, obviously). That means that if you use Sagas or EH’s that dispatch commands, they will acquire locks for each aggregate, only releasing them when the event finished handling.

The purpose of IdentifierBasedLock is to use a single object that allows you to acquire a lock based on an identifier. If thread a does lock(A), and another uses lock(B), they will not need to wait for each other.

If you configure the same PessimisticLockManager instance for each of you repositories and SagaManagers, Axon will be able to detect deadlocks and resolve them (by appointing a deadlock loser).
I’m thinking about adding timeouts to prevent the scenario where threads are locked indefinitely, waiting for eachother.

Cheers,

Allard

We have only one saga repository instance for all (about 30 instances) saga managers. Maybe that is our mistake? Is there any pointer how to set up PessimisticLockManager? I would like to check if this will help. In general our workflows are not too complicated and each event type is always handled only by one saga/event handler instance, but still might be executed in different threads. Thread pool used for async event bus is configured to use “CallerRunsPolicy” because scheduler publish messages faster than we can process them.

Best regards,
Lukasz

W dniu niedziela, 13 października 2013 11:50:24 UTC+2 użytkownik Allard Buijze napisał:

Hi Lukasz,

having a single Repository doesn’t really matter. One way that will certainly prevent the deadlocks is by dispatching the commands asynchronously (e.g. by using an AsyncCommandBus or a threadpool for dispatching the commands).

The problem is in the fact that Sagas will normally hold a lock on the Saga instance and will then attempt to acquire a lock on an aggregate. When a single Saga needs to lock multiple aggregates and another saga does the same in another order, your deadlock is there.

The element allows you to configure a LockManager. If you create a single PessimisticLockManager instance (no configuration needed) and configure it in each of the repositories, you’re more likely to detect the deadlocks when they occur.
But again, it’s better to solve them by using an AsynchronousCommandBus. Then, a single (task in a) thread will always perform 1 action, requiring 1 lock, making it impossible to get deadlocks.

Cheers,

Allard

Allard,
We don’t use commands nor aggregates. Just POJO events and Saga classes. I used wrong term to reflect our configuration at the beginning saying that we have async event bus. In fact to have event dispatching really asynchronous we have ClusteringEventBus which uses AsynchronousCluster.

Cheers,
Lukasz

W dniu poniedziałek, 14 października 2013 09:28:37 UTC+2 użytkownik Allard Buijze napisał:

I see. And do you use a single SagaManager for all 30 Saga types?
A single Event, by how many Saga instances is it like to be handled?

There is also an AsyncAnnotatedSagaManger, which uses a lockless algorithm. It’s also asynchronous, so it can be safely placed in a synchronous cluster. It uses a different way to prevent concurrent access to a single Saga instance, removing the need for locks. In some cases, it also performs a lot better.

What I don’t understand is how the deadlock has not been detected, assuming you use a single SagaManager. Might it be possible that a Saga instance was hanging, causing the other instances to wait for the hanging process to finish?

Cheers,

Allard

Allard,
We have only one saga repository but we have 30 saga types, each has own manager instance. We may switch to only one if this is an issue. Can you say if it’s fine to have ClusteredEventBus & AsynchronousCluster saga managers based on AbstractAnnotatedSagaManager, not AsyncAnnotatedSagaManager?

Best regards,
Lukasz

W dniu poniedziałek, 14 października 2013 11:17:17 UTC+2 użytkownik Allard Buijze napisał:

Hi Lukasz,

the configuration should be fine. However, I wouldn’t place too many SagaManagers together in a single AsyncCluster. The cluster uses a task/thread per event, invoking all handlers on that one thread. By splitting the activity of the SagaManagers over different thread groups, you’re much less likely to have deadlocks.

In addition, you can also consider using the “SequentialPolicy” SequencingPolicy on your AsynchronousClusters, causing all events to be handled sequentially. Since your deadlocks are most likely caused by two threads acquiring locks in different order, this will also prevent problems. With a performance-cost, however, so use this in combination with assigning the SagaManager instances to different Clusters instances.

Cheers,

Allard

Hi Lukasz,

On my way home from work, I suddenly realized that if you don’t send commands from your sagas, that you cannot get a deadlock that way. So disregard my previous message.
But that still left the issue open. After some more thinking, the only thing I can come up with, is the fact that the saga manager doesn’t always acquire the locks in the same order. For one event, the SagaManager may locks sagas B and then A, while for another event, they may be locked in the opposite order. If you’re in a highly concurrent system, a deadlock is bound to happen some time.

I’m going to try to fix it in the 2.0.6 branch (currently snapshot) tomorrow. Would you have time to see if you can reproduce it in that version? The fix I’m thinking about is to order the identifiers of the sagas that need to be invoked. That way, all instances acquire the locks in the same order. That should make it impossible for a deadlock to happen.

Cheers,

Allard

Hey Allard,
Today we have put our project build with AsyncAnnotatedSagaManager for night run to verify if it goes well. We might do same thing with snapshot of 2.0.6 tomorrow evening to see if AnnotatedSagaManagers will be also fine.

Cheers,
Lukasz

W dniu poniedziałek, 14 października 2013 22:41:16 UTC+2 użytkownik Allard Buijze napisał:

Hi Lukasz,

after reviewing the code, I noticed that Saga ID’s are already returned in the same order each time. That means a deadlock is (at least theoretically) impossible. One exception is the InMemoryRepository, which uses a HashSet, which could cause different orders on different requests. I will fix that one. Which repository implementation do you use?

Next, I checked your stacktrace again, as well as the graph you sent. What I noticed there is that you actually don’t have a deadlock, you just have very high contention. Obviously the 5 threads you assigned to the EventBus cannot deal with all incoming events fast enough. Using a CallRunsPolicy would then mean that the caller (probably a Jetty thread) would join in on the contented lock, causing it to be slowed down as well.

A solution direction would be first to use the AsynchronousSagaManager. It uses the Disruptor underneath to manage the processing queue and handling. This is a lockless mechanism that uses a bounded queue, causing producers to delay when the SagaManager is contended. This will delay the input more gradually (causing slower reading from the JMS queue), allowing the Sagas to keep up. Effectively, some flow control needs to be implemented.
Additionally, you could also configure a CachingRepository to improve Saga processing speed.

Cheers,

Allard

Hey Allard,
Sorry for delay in reply. We use own saga repository implementation which stores saga instances in cassandra column family. We don’t have any locking mechanism there.

I do agree it is not a regular deadlock. However the problem is that we threads which are locked for execution of saga never get free or at least it happens with big delays. From my understanding after execution of saga jetty thread should be returned to Jetty pool, however it do not happen because it waits for lock to execute saga, as axon-1 thread shown in thread dump from my first post.

We have switched to AsyncAnnotatedSagaManager and we get mixed results. On better hardware we had no problems with dummy data, but on second server where we have only 8 processing flows and we put real data event handling was much slower. We will set up a test run on better machine with real data this afternoon. Also I’m not sure if we still need ClusteredEventBus and thread pool executor since disruptor are capable to queue events/handlers in ring buffer. On this slower machine we again have jetty threads leek. Over the night we got plenty of them locked in same way. From 3 PM to 9 AM number of threads went from 290 up to 510

“qtp1150409357-19282” Id=19282 TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:349)
at com.lmax.disruptor.AbstractMultithreadedClaimStrategy.waitForFreeSlotAt(AbstractMultithreadedClaimStrategy.java:99)
at com.lmax.disruptor.AbstractMultithreadedClaimStrategy.incrementAndGet(AbstractMultithreadedClaimStrategy.java:49)
at com.lmax.disruptor.Sequencer.next(Sequencer.java:127)
at com.lmax.disruptor.EventPublisher.publishEvent(EventPublisher.java:45)
at com.lmax.disruptor.dsl.Disruptor.publishEvent(Disruptor.java:186)
at org.axonframework.saga.annotation.AsyncAnnotatedSagaManager.handle(AsyncAnnotatedSagaManager.java:166)

As my side experiment I turned out AnnotatedSagaManager synchronization and finally I’ve got some deadlock instead of infinite lock: :wink:
Handler [SagaX] threw an exception while handling event of type [Y]. Continuing processing with next handler

org.axonframework.common.lock.DeadlockException: An imminent deadlock was detected while attempting to acquire a lock
at org.axonframework.common.lock.IdentifierBasedLock$DisposableLock.checkForDeadlock(IdentifierBasedLock.java:181)
at org.axonframework.common.lock.IdentifierBasedLock$DisposableLock.lock(IdentifierBasedLock.java:164)
at org.axonframework.common.lock.IdentifierBasedLock$DisposableLock.access$100(IdentifierBasedLock.java:138)
at org.axonframework.common.lock.IdentifierBasedLock.obtainLock(IdentifierBasedLock.java:102)
at org.axonframework.saga.AbstractSagaManager.invokeExistingSagas(AbstractSagaManager.java:91)
at org.axonframework.saga.AbstractSagaManager.handle(AbstractSagaManager.java:75)
at org.axonframework.eventhandling.async.EventProcessor.doHandle(EventProcessor.java:239)
at org.axonframework.eventhandling.async.EventProcessor.processNextEntry(EventProcessor.java:204)
at org.axonframework.eventhandling.async.EventProcessor.run(EventProcessor.java:186)
at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2025)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.axonframework.eventhandling.async.EventProcessor.scheduleEvent(EventProcessor.java:99)
at org.axonframework.eventhandling.async.AsynchronousCluster.schedule(AsynchronousCluster.java:151)
at org.axonframework.eventhandling.async.AsynchronousCluster.publish(AsynchronousCluster.java:136)
at org.axonframework.eventhandling.ClusteringEventBus$SimpleEventBusTerminal.publish(ClusteringEventBus.java:129)
at org.axonframework.eventhandling.ClusteringEventBus.publish(ClusteringEventBus.java:91)
at com.example.saga.SagaX.handle(SagaX.java:96)
at sun.reflect.GeneratedMethodAccessor120.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.axonframework.common.annotation.MethodMessageHandler.invoke(MethodMessageHandler.java:75)
at org.axonframework.common.annotation.MessageHandlerInvoker.invokeHandlerMethod(MessageHandlerInvoker.java:66)
at org.axonframework.saga.annotation.SagaEventHandlerInvoker.invokeSagaEventHandlerMethod(SagaEventHandlerInvoker.java:66)
at org.axonframework.saga.annotation.AbstractAnnotatedSaga.doHandle(AbstractAnnotatedSaga.java:84)
at org.axonframework.saga.annotation.AbstractAnnotatedSaga.handle(AbstractAnnotatedSaga.java:79)
at org.axonframework.saga.AbstractSagaManager.loadAndInvoke(AbstractSagaManager.java:171)
at org.axonframework.saga.AbstractSagaManager.invokeExistingSagas(AbstractSagaManager.java:94)
at org.axonframework.saga.AbstractSagaManager.handle(AbstractSagaManager.java:75)
at org.axonframework.eventhandling.async.EventProcessor.doHandle(EventProcessor.java:239)
at org.axonframework.eventhandling.async.EventProcessor.processNextEntry(EventProcessor.java:204)
at org.axonframework.eventhandling.async.EventProcessor.run(EventProcessor.java:186)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Cheers,Lukasz

W dniu wtorek, 15 października 2013 10:40:27 UTC+2 użytkownik Allard Buijze napisał:

One clarification, this deadlock exception was caused by saga synchronization. In place where it occured I forgot to set SynchronizeSagaAccess to false.

W dniu czwartek, 17 października 2013 12:06:43 UTC+2 użytkownik Łukasz Dywicki napisał:

Hi Lukasz,

the problem you’re facing is in my opinion not so much Axon related, but more related to the use of locks (ok, by Axon) in combination with the “caller runs” policy.
Your sagas seem to send out new events. If the queue is full, the process that event in the calling thread. This means they’re holding a lock, and acquiring a new one. This is problematic in any type of system.

Your problem starts with the fact that messages are provided at a speed that the SagaManager cannot cope with. That means queue is filled up. Using “caller runs” only makes the problem worse. It would be better to introduce a “flow control” mode (some MQ systems have that) to prevent problems. While a system is in “flow control”, messages should be written to a separate queue, which could e.g. be persisted to disk. When the queue empties, the queue from disk is slowly fed into the “in-memory queue”. When the disk queue is empty (or fits into the memory queue entirely), flow control status can be removed.

As I said, message brokers typically have these types of mechanisms in place. It might be something for you to look at. Also note that in a thread pool, new threads are only added once the queue is full. That means extra capacity is added at a point where your system is probably so deep in the sh*t already, that the extra threads can’t pull it out anymore.

Just a thought-dump…
Cheers,

Allard

Allard,
I agree that amount of the data we have is causing some troubles. The caller runs policy allowed us to introduce some kind of throttling for these threads who are producing data too fast (ie scheduler) and allowed us to keep receive messages from other places like JMS queues. This was a side effect of fact that we have had out of memory issues when scheduler started to bomb axon with gazillion events.

The way we can go now is, as you suggested, change to two or more clusters (in terms of axon) + custom cluster selector. One cluster will be for realtime events and second for scheduler events which don’t have to be processed right now but may be a little delayed.

I did also a small experiment with JMS and it was running fine, however reading events from cassandra and then pushing them over broker to sagas is kind of duplicate work.

Cheers,
Lukasz

W dniu piątek, 18 października 2013 12:00:10 UTC+2 użytkownik Allard Buijze napisał: