Intercept events before they are sent to rabbitMQ

I’m also using AMQP in a POC and Axon sends a message to rabbitMQ that contains all the data in the aggregate… which is not the desired functionality. I would want the message to contain only guid generated for the entity (not the aggregate guid) the entity that is persisted to a SQL database. Rabbit will then push the message to another service that will use the query api to get the entire entity. We don’t want all the entity data in the rabbitMQ message because this queue will be public.

I was wondering if I could intercept the message before it is sent to the queue and change what is being sent, but without affecting what is being sent to the EventStore. Not sure if MessageDispatchInterceptor is the best candidate for this and if there is any example out there on how to use it…
Also, not sure if there is some sort of annotation that can be added to the aggregate fields so they don’t get included in the message sent to rabbitMQ…

Regards,
Bruno

Hi Bruno,

you say “Axon send a message […] that contains all the data in the aggregate”. Actually, Axon just sends the Event. If the entire aggregate is sent, that probably means your aggregate is part of the event itself, which would be a design issue that needs some attention.

What you’re looking for is not an interceptor, but the AMQPMessageConverter. It is used by the SpringAMQPPublisher and SpringAMQPMessageSource when sending and receiving messages to and from AMQP, respectively. Basically, it defines how you want messages on AMQP to be structured.
In your case, you could create a small message with only identifiers.

Hope this helps.
Cheers,

Allard

Thanks for the reply Allard,

I guess I didn’t express myself correctly, it contains the event data like you said, which I have also set to the aggregate:

@EventSourcingHandler
public void on(MyEntityCreatedEvent event) {
    this.aggregateId = event.getMyEntityGuid();
    this.someInt = event.getMyEntity().getSomeInt();
    this.customData = event.getMyEntity().getCustomData();
}

I will play around with the AMQPMessageConverter and will use a basic POJO class that serializes to { "event": "MyEntityCreatedEvent", "guid": "some-guid" }...

Thanks!

Hi Allard,

I have been using Axon, for about 2 weeks now, so AMQPMessageConverter is new territory for me. I have figured out I need to create a SpringAMQPPublisher bean where I create my publisher and and pass it the message converter bean:

publisher.setMessageConverter(amqpMessageConverter);

What I am not sure is where the the actual conversion code is written… I see that for my AMQPMessageConverter bean I can return:

return new DefaultAMQPMessageConverter(serializer, routingKeyResolver, amqpProperties.isDurableMessages());

Would I have to implement a serializer that would do the actual conversion of the message into my POJO?

Thanks,
Bruno

I think I found out what I need to do:

    @Bean
    public AMQPMessageConverter amqpMessageConverter(Serializer serializer, RoutingKeyResolver routingKeyResolver) {
        return new DefaultAMQPMessageConverter(serializer, routingKeyResolver, amqpProperties.isDurableMessages()){
            @Override
            public AMQPMessage createAMQPMessage(EventMessage<?> eventMessage) {
                AmqpEntity amqpEntity = new AmqpEntity(eventMessage.getPayloadType().getSimpleName(), eventMessage.getIdentifier());

                DomainEventMessage message = new GenericDomainEventMessage<AmqpEntity>(
                        eventMessage.getPayloadType().toString(),
                        eventMessage.getIdentifier(),
                        0,                                // NOT SURE THAT THE SEQUENCE NUMBER MATTERS
                        amqpEntity);

                SerializedObject<byte[]> serializedObject = serializePayload(message, serializer, byte[].class);
                String routingKey = routingKeyResolver.resolveRoutingKey(message);
                AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                Map<String, Object> headers = new HashMap<>();
                headers.put("some header", "some data");
                properties.headers(headers);
                if (amqpProperties.isDurableMessages()) {
                    properties.deliveryMode(2);
                }
                return new AMQPMessage(serializedObject.getData(), routingKey, properties.build(), false, false);
            }
        };
    }

Does this look ok?


Thanks,

Bruno

Hi Bruno,

you don’t need to wrap your AmqpEntity in a DomainEventMessage. You could pass that class (in serialized form) directly as the contents of the message instead.
That would make the code quite a bit simpler.

Cheers,

Allard

Thanks Allard,

I had thought of that but got side tracked with JPA and Hibernate on the DB persistence side of things, since I’m also new to that.

But I quickly changed my converter:

AmqpEntity amqpEntity = new AmqpEntity(eventMessage.getPayloadType().getSimpleName(), eventMessage.getIdentifier());

SerializedObject<byte[]> serializedObject = serializer.serialize(amqpEntity, byte[].class);

I also added a JacksonSerializer bean since I need the messages to be stored as json and it’s working like a charm.

I’m really impressed and enjoying the capabilities of Axon. Which Hibernate was as easy to get into. (from someone used to Entity Framework…)

Thanks for steering me in the right direction.
Bruno

Hello Bruno,

I have a usecase where I want to customize the routing key, were you able to customize the routing key in the application. Also if you have a git for your source code can you forward it to me.

Hi Ajinkya,

When I apply the events inside my aggregates, I apply them with metadata. One of the metadata keys is the routing key.

Than inside the createAMQPMessage (you can find it a few messages above in this thread), I do the following:


byte[] amqpBody = buildAMQPMessageIWantToSend();


String routingKey = eventMessage.getMetaData().get(“routingKey”).toString().toLowerCase();

return new AMQPMessage(amqpBody, routingKey, properties.build(), false, false);