Using MongoDB as the eventSourcing and RabbitMQ for messages

Hi,
I’m new to Axon and trying to figure out how to make MongoDB the storage for the event sources and also how to use RabbitMQ. I found different examples on web and trying to combine them but I keep getting the following error:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'axonJsonSerializer' defined in class path resource [com/Ultimatesoftware/IntegrationServices/Commands/config/AxonConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.axonframework.serialization.Serializer]: Factory method 'axonJsonSerializer' threw exception; nested exception is java.lang.NoClassDefFoundError: com/fasterxml/jackson/datatype/jsr310/JSR310Module

below is configuration file that i’m using to define mongodbTemplate:


@Configuration
public class AxonConfig {

 @Value("${mongodb.url}")
 private String mongoUrl;

 @Value("${mongodb.port}")
 private int mongoPort;

 @Value("${mongodb.dbname}")
 private String mongoDbName;

 @Value("${mongodb.events.collection.name}")
 private String eventsCollectionName;

 @Value("${mongodb.events.snapshot.collection.name}")
 private String snapshotCollectionName;

 @Bean
 public Serializer axonJsonSerializer() {
 return new JacksonSerializer();
 }

 @Bean
 public EventStorageEngine eventStorageEngine(){
 return new MongoEventStorageEngine(
 axonJsonSerializer(),null, axonMongoTemplate(), new DocumentPerEventStorageStrategy());
 }

 @Bean(name = "axonMongoTemplate")
 public MongoTemplate axonMongoTemplate() {
 MongoTemplate template = new DefaultMongoTemplate(mongoClient(), mongoDbName, eventsCollectionName, snapshotCollectionName);
 return template;
 }

 @Bean
 public MongoClient mongoClient(){
 MongoFactory mongoFactory = new MongoFactory();
 mongoFactory.setMongoAddresses(Arrays.asList(new ServerAddress(mongoUrl)));
 return mongoFactory.createMongo();
 }
}

and here is the code I have in the main application file:

Hi Amr,

Looks like you should add this dependency to your project:

com.fasterxml.jackson.datatype jackson-datatype-jsr310

Cheers,

Hello Frans,
Thank you for your help. Adding the dependency helped in getting rid of the issue. Now it seems i’m having issues connecting to mongoDB. I’m getting the below error. I have MongoDb in Docker and i’m able to connect to it via Robo 3T and accessing port 192.168.99.100:32784

`

com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.connection.SocketStream.open(SocketStream.java:63) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:115) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:113) ~[mongodb-driver-core-3.4.3.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_131]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_131]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_131]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_131]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_131]
at com.mongodb.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:57) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.connection.SocketStream.open(SocketStream.java:58) ~[mongodb-driver-core-3.4.3.jar:na]
… 3 common frames omitted
2017-10-15 05:55:52.585 WARN 12432 — [ main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘eventStorageEngine’: Invocation of init method failed; nested exception is com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=192.168.99.100:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused: connect}}]
2017-10-15 05:55:52.602 INFO 12432 — [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2017-10-15 05:55:52.622 INFO 12432 — [ main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with ‘debug’ enabled.
2017-10-15 05:55:52.628 ERROR 12432 — [ main] o.s.boot.SpringApplication : Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘eventStorageEngine’: Invocation of init method failed; nested exception is com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=192.168.99.100:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused: connect}}]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:137) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:409) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1620) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.7.RELEASE.jar:1.5.7.RELEASE]
at com.Ultimatesoftware.IntegrationServices.Commands.CommandsApplication.main(CommandsApplication.java:14) [classes/:na]
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=192.168.99.100:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused: connect}}]
at com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:377) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:104) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:75) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:71) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:415) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.operation.CreateIndexesOperation.execute(CreateIndexesOperation.java:144) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.operation.CreateIndexesOperation.execute(CreateIndexesOperation.java:71) ~[mongodb-driver-core-3.4.3.jar:na]
at com.mongodb.Mongo.execute(Mongo.java:845) ~[mongodb-driver-3.4.3.jar:na]
at com.mongodb.Mongo$2.execute(Mongo.java:828) ~[mongodb-driver-3.4.3.jar:na]
at com.mongodb.MongoCollectionImpl.createIndexes(MongoCollectionImpl.java:491) ~[mongodb-driver-3.4.3.jar:na]
at com.mongodb.MongoCollectionImpl.createIndex(MongoCollectionImpl.java:458) ~[mongodb-driver-3.4.3.jar:na]
at org.axonframework.mongo.eventsourcing.eventstore.AbstractMongoEventStorageStrategy.ensureIndexes(AbstractMongoEventStorageStrategy.java:200) ~[axon-mongo-3.0.6.jar:3.0.6]
at org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine.ensureIndexes(MongoEventStorageEngine.java:123) ~[axon-mongo-3.0.6.jar:3.0.6]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134) ~[spring-beans-4.3.11.RELEASE.jar:4.3.11.RELEASE]
… 18 common frames omitted

`

Enter code here…

`

`

I managed to fix it. It looks like i needed to pass in the port to the mongoClient bean too. I was just passing the url without the port.

Hi Amr,

I don’t know a whole lot about Mongo, but having a quick look at it I notice that your stacktrace says your program’s trying to connect to 192.168.99.100:27017 and your email says you tried 192.168.99.100:32784. My first thought would be that there’s some misconfiguration with ports.

Cheers,

Hello Frans,

Thank you for your reply. I managed to fix the issue and now events are persisting in the DB.
I’m facing another issue now and maybe this is due to my lack of knowledge in Axon as this moment, but the query module/system doesn’t seem to be reading any data. The query model is able to get messages from RabbitMQ but when GET api is returning nothing. below are the files i have.

Thank you so much for your help!!!

applicationfile:


@SpringBootApplication
public class QueriesApplication {

 public static void main(String[] args) {
 SpringApplication.run(QueriesApplication.class, args);
 }

 @Bean
 public SpringAMQPMessageSource IntegrationsQueue(Serializer serializer){
 return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)){
 @RabbitListener(queues = "Integration")
 @Override
 public void onMessage(Message message, Channel channel) throws Exception{
 super.onMessage(message, channel);
 }
 };
 }
}

controller class:

here is the full code https://github.com/amrtahlawi/CQRSWithMongoAndRabbitMQ

Hi Amr,

I’ve browsed through the code but honestly can’t see what’s wrong with the read side, it’s a simple get mapped to a findAll - should be fine. I’m sorry that I can’t help you here.

Some other feedback (which might be useful, but won’t fix your problem): You’re settting this up with an event-sourced command side, storing events in MongoDB, transmitting events on RabbitMQ, and then having a query side that listens on RabbitMQ and stores its read model in MongoDB again. That’s totally fine of course. But I noticed there is some stuff in you query side that is only needed at the command side (like a CommandGateway, event sourcing repo etc.). You can (and probably should) remove that from the query side.

Generally speaking, as of today, Axon Framework isn’t involved much with the query side, except for delivering events to it. I don’t expect your current problem is Axon-related.

Cheers,

Frans van Buul

Hello Frans,
I think i found the issue. I had the @ProcessingGroup(“Integration”) in the wrong spot. It should be added to the handler class and not the restcontroller class :slight_smile: .
Now I’m moving to the next issue where it cannot deserialize the incoming message

o.a.a.e.spring.SpringAMQPMessageSource : Unable to deserialize an incoming message. Ignoring it. org.axonframework.serialization.UnknownSerializedTypeException: Could not deserialize a message. The serialized type is unknown: com.Ultimatesoftware.IntegrationServices.Commands.Api.IntegrationCreatedEvent (rev. null)

Fixed more issues and committed the changes. I’m still getting errors when processing the event

`
2017-10-15 13:34:10.719 ERROR 26924 — [cTaskExecutor-1] o.a.eventhandling.LoggingErrorHandler : EventListener [IntegrationEntityListener] failed to handle event [047823bb-1e44-4348-a30f-3950d31b5053] (com.Ultimatesoftware.Integrationservices.Api.IntegrationCreatedEvent). Continuing processing with next listener

Caused by: org.xmlpull.v1.XmlPullParserException: only whitespace content allowed before start tag and not { (position: START_DOCUMENT seen {… @1:1)
at org.xmlpull.mxp1.MXParser.parseProlog(MXParser.java:1519) ~[xpp3-1.1.4c.jar:na]
at org.xmlpull.mxp1.MXParser.nextImpl(MXParser.java:1395) ~[xpp3-1.1.4c.jar:na]
at org.xmlpull.mxp1.MXParser.next(MXParser.java:1093) ~[xpp3-1.1.4c.jar:na]
at com.thoughtworks.xstream.io.xml.XppReader.pullNextEvent(XppReader.java:109) ~[xstream-1.4.9.jar:1.4.9]
… 52 common frames omitted
`

I fixed part of the problem. I needed to add an empty constructor to the event listener. Now trying to figure out how to get the uri of the mongoDb to pass it to spring. the mongoDB is being served in docker

Fixed it too. Now i have a full sample working.
Thanks Frans for your help!

That’s great to hear Amr, well done!