Axon AMQP MessageBroker and MongoDB as eventstore

Hello I try to configure my MongoDB Axon setup with RabbitMQ / later then with Azure Service Bus… I was trying based on the sample application ‘amqp-axon-example’ GitHub - AxonFramework/extension-amqp: Axon Framework extension for AMQP integration to publish and handle Event messages.

but every time when I configure my MongoDB with the following configuration I do not see any events in RabbitMQ…


@Component
class AxonMongoDBConfiguration {

    /**
     * The SpringAMQPMessageSource allows event processors to read messages from a queue instead of the event store or
     * event bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource needed by these processors.
     *
     * @param messageConverter Converter to/from AMQP Messages to/from Axon Messages.
     */
    @Bean
    fun amqpMessageSource(messageConverter: AMQPMessageConverter): SpringAMQPMessageSource {
        return object : SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = [AMQPAxonExampleApplication.QUEUE_NAME])
            override fun onMessage(message: Message?, channel: Channel?) {
                println("amqp event $message received")
                super.onMessage(message, channel)
            }
        }
    }

  
    companion object {
        const val DATABASE_NAME = "eventstore"
    }

    /**
     *
     * Create a Mongo based Event Storage Engine.
     */
   @Bean
    fun storageEngine(client: MongoClient?): EventStorageEngine? {
        return MongoEventStorageEngine.builder()
            .eventSerializer(
                XStreamSerializer.builder()
                    .xStream(SecureXStreamSerializer.xStream())
                    .build()

            )
            .snapshotSerializer(
                XStreamSerializer.builder()
                    .xStream(SecureXStreamSerializer.xStream())
                    .build()
            )
            .mongoTemplate(
                DefaultMongoTemplate
                    .builder()
                    .mongoDatabase(client, DATABASE_NAME)
                    .build()
            ).build()
    }


    /**
     * Uses the Configurer to wire everything together including Mongo as the Event and Token Store.
     */
    @Autowired
    fun configuration(configurer: Configurer, client: MongoClient) {
        configurer
            .configureEmbeddedEventStore {
                storageEngine(client)
            }
            .eventProcessing { conf -> conf.registerTokenStore { tokenStore(client, it.serializer()) } }
    }


    /**
     * Create a Mongo based
     * Token Store.
     */
    fun tokenStore(client: MongoClient, serializer: Serializer): TokenStore = MongoTokenStore.builder()
        .mongoTemplate(
            DefaultMongoTemplate.builder()
                .mongoDatabase(client, DATABASE_NAME)
                .build()
        )
        .serializer(serializer)
        .build()

}

when I replace it with the following from the sample app… I see the events…

what I am missing here ?

/**
     * Creates an InMemoryEventStorageEngine.
     * NOT PRODUCTION READY
     */
    @Bean
    fun storageEngine(): EventStorageEngine = InMemoryEventStorageEngine()

    /**
     * Creates an InMemoryTokenStore.
     * NOT PRODUCTION READY
     */
    @Bean
    fun tokenStore(): TokenStore = InMemoryTokenStore()

Sample to reproduce is here:

maybe somebody can help me :frowning:

I think I found a solution… when I configure the tokenStore like follow it look like it works…


    @Bean
    fun tokenStore(client: MongoClient, serializer: Serializer): TokenStore {
        return MongoTokenStore.builder()
            .mongoTemplate(
                DefaultMongoTemplate.builder()
                    .mongoDatabase(client, DATABASE_NAME)
                    .build())
            .serializer(serializer)
            .build()
    }

It depends a lot on what you want to do. Do you want to use MongoDB for ‘internal’ events, and use RabbitMQ for the ‘integration/external’ events? Do you want to put all events from MongoDB into a queue, or do you want to read from the queue and store them in Mongo?

I plan … to use different microservices. Imp. with Axon. eg. Command Side and a Query side for the moment. Then I was thinking I have to communicate between the services with a MessageBroker. We do not have Axon Server. Our setup is AzureServiceBus. So I was planing to make here a PoC setup with RabbitMQ.

Or what are the best practice ?

should be there ONE eventstore for all ? is this then external events ?

If you ask me, the best practice would be to use Axon Server. It makes it much easier since the distributed event bus and the event store are the same thing.

Just for splitting the command and query side, you don’t need a distributed event bus. The command side can just add the event to the event store. (I hope you don’t use Mongo DB for this.) The query side can just read from the event store. That does mean they both need to access the same database.

If that is a concern, you do indeed need a distributed event bus. On the command side, you could read from the event store and publish the events to a queue. You do lose the ability to do replays and/or read all the events from the beginning on the query side. So I’m not sure this is worth the trouble.

Hello yes true I believe the AxonServer will be the best. Is in company environment not the always a easy way:)

We use MongoDB for EventStore and also for the Projection…

so it is not a anti-pattern when x-services. will connect to the same EventStore directly…?

To use MessageBroker is then, that I get it right for external Services like managet from other teams ?

Thanks for the Link I have to read it…

It is mostly an anti-pattern to use the same database with multiple applications when just doing microservices. As the schema of the data will change and it might be unclear what the responsibilities of one app to another are.

By having the event store abstraction, most of those problems are not there. Especially not as long as one is used for the commands and another for the queries.

There are still some things which should be handled with care. For example when introducing a new event. Ideally all servers already have the class for the new event, so they can all serialize it properly, before adding the event to the store. Typically a seperate api repository is used for these, so multiple projects can easily share. The same applies to upcasters, once they are needed.

1 Like