Disruptor command bus with multiple aggregate and event sourcing repositories

Hi,

I am trying to implement the disruptor command bus.I have two event sourcing repositories with two aggregates.
I am using the following tag:

<axon:disruptor-command-bus command-target-resolver=“annotationCommandTargetResolver” “commandBus” transaction-manager=“springTxMgr”
event-store=“eventStore” event-bus=“eventBus”>
axon:repositories
<axon:repository id=“abc” aggregate-type=“xyz”/>
<axon:repository id=“def” aggregate-type=“pqr”/>
</axon:repositories>
axon:publisher-interceptors

</axon:publisher-interceptors>

</axon:disruptor-command-bus>

As suggested in the reference document AnnotationCommandTargetResolver will tell which aggregate is the target of a specific Command using annotations to describe the target.
When i am running the application the command bus is not getting called. Have i missed something here?

Any pointers are welcome.

Cheers
Prashant

Hi Prashant,

there is a little issue in the DisruptorCommandBus and the dispatch(Command) method. If an error occurs, that error is silently discarded (and is not logged anywhere). I suggest trying the dispatch(command, new LoggingCallback(command)) method instead. That should provide some logging and give an indication of what’s wrong.

Small side-note: if you use the AnnotationCommandTargetResolver, you don’t have to mention it in your config. It’s the default one.

Cheers,

Allard

PS. There is a “commandbus” floating in your xml declaration. I assume that a copy-paste error and not actually in your config?

Hi Allard,

Thanks for the reply. I used the LoggingCallback as you suggested but it did not show any error.
When i debugged the code i could see that the aggregate identifier was null in the Disruptor command bus.
I have two aggregates having same aggregateIdentifier, whether this can be a issue?

Cheers,
Prashant

It is possible to have two aggregate with the same identifier, as long as they are of a different type.
Where do you see the aggregate identifier is null? In the repository code?

Do you see the result of your command being logged by the LoggingCalback?

Hi Allard,

I am seeing the aggregate identifier as null in DisruptorCommandBus.java file.
I am not seeing the result of my command being logged by the LoggingCallback.

Do you mean i can have only single aggregate of type String, int, long, Object?
I am currently having same aggregate type for two aggregates.

Cheers,
Prashant

Hi Prashant,

I really need more info to be able to find out what’s happening. Can you tell me which field/variable is null? Did you put a break point? Coukd you send a stack trace of the execution point at your breakpoint?

Cheers,

Allard

Hi Allard,

I was able to resolve the issue when i rearranged the ordering of the beans.
I had to put the eventbus bean after the disruptor command bus as it was mentioned in one of the threads (Sample DisruptorCommandBus Configuraton) of axon google group and provided executor (thread pool) for the disruptor command bus.

Now the disruptor command bus is working fine when i provide single aggregate reference in axon:repository.

I am getting following error (file attached with stacktrace)

org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name ‘abcRepository’: Requested bean is currently in creation: Is there an unresolvable circular reference?

when i am trying with two aggregates as shown below:

<axon:disruptor-command-bus id=“commandBus” event-store=“eventStore” event-bus=“eventBus” executor=“disruptorThreadPool”>
axon:repositories
<axon:repository id=“abcRepository” aggregate-type=“com.ABC”/>
<axon:repository id=“defRepository” aggregate-type=“com.XUZ”/>
</axon:repositories>
</axon:disruptor-command-bus>

I also tried with different aggregate identifies for each repository but getting the error attached in file.
I am using asynchronous saga for event handling which is working fine with disruptor command bus with single aggregate.
My spring version is 3.2.0 and axon version is 2.0.7. I also tried with spring version 3.2.3 but getting same error.

Could you please throw some light on this error and ways to resolve it?

Cheers,
Prashant

BeanInCreationError.txt (33.1 KB)

Hi Prashant,

this is an annoying one to solve. For some reason, something has changed in Spring 3.2 in the way beans are resolved. It tends to break on “circular dependencies” much earlier than before, even when in some cases there is no real circular dependency. That’s why the actual order of beans seems to matter.

I know what to do to get around the issue, as long as I know which beans are part of the “cycle”. Your stacktrace can help with that, but since I didn’t manage to reproduce it yet, it will be hard to find out if the problem was solved.

For background information: how did you configure your event bus? Do you use (event listener) clusters?

Cheers,

Allard

Hi Prashant,

I noticed the following statement in your stacktrace:
Error creating bean with name ‘defRepository’: Cannot resolve reference to bean ‘commandBus’ while setting constructor argument

This is a weird one, because the repository bean is not created using constructor argument anymore. This has changed between 2.0.5 and 2.0.6 to resolve this issue of cyclic dependencies (see AXON-186). Are you sure your classpath doesn’t accidentally contain a 2.0.5 version of Axon?

Cheers,

Allard

Hi Allard,

The issue is resolved now, i was having the axon 2.0.5 in my class path. Thanks for pointing that out.

Now i am facing another issue:

The saga event handlers are not getting called when i am firing few request one after another (60 requests) using the disruptor command bus with both async and non-async saga.

I could see the event handler getting called only for 8-10 requests out of 60 requests in both cases.

a) Async saga having pool size as 10 and processor count as 10.

I am getting the following error:

Name: myThreadPool-8
State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@30178442
Total blocked: 3 Total waited: 576,356

Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
org.axonframework.saga.annotation.AsyncSagaCreationElector.waitForSagaCreationVote(AsyncSagaCreationElector.java:60)
org.axonframework.saga.annotation.AsyncSagaProcessingEvent.waitForSagaCreationVote(AsyncSagaProcessingEvent.java:89)
org.axonframework.saga.annotation.AsyncSagaEventProcessor.onEvent(AsyncSagaEventProcessor.java:103)
org.axonframework.saga.annotation.AsyncSagaEventProcessor.onEvent(AsyncSagaEventProcessor.java:43)
com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
org.axonframework.saga.annotation.AsyncAnnotatedSagaManager$StartDetectingRunnable.run(AsyncAnnotatedSagaManager.java:409)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:724)

b) Non async saga:
The request is going to the command handler and not to the saga event handlers. Only few saga event handlers are getting called and in some called saga event handlers the scheduled events are not getting triggered. I could not see any quartz related error/deadlocks on my console.

  1. What would be the best approach to handle bulk requests (JMS queues) and also to maintain the saga conversation? (
    Our requirement is to process each request(JMS Queue ) as it comes and store the aggregate in event store and start the saga and based on second request close the saga, all these requests comes with in milli second gap(with 2k tps), we need to process all these request, we also need to schedule timeouts wait for dependent request )

Could you please provide your comments on above points?

Cheers,

Prashant

HI Prashant,

are you sure your assocation values are correct? Saga’s are only invoked if the AssociationValue matches a value of the Saga. If you use @StartSaga, an association value is automatically created, based on the properties of the @SagaEventHandler annotation.
Furthermore, a queue size of 10 is extremely small. Generally, I choose a value around 512 (should always be a power of 2, because of how the Disruptor works)…

If you have these kinds of problems, it generally helps to share some of the code. It gives more insight into what you’re trying to do.

Cheers,

Allard