AxonAMQP: How to route events to specific queues.

Hi all,
I’ve got this command handler that sends events over rabbit

@CommandHandler
public Profession(AddProfessionCommand command) {
    JSONObject jsonEvent = new JSONObject();
    jsonEvent.put("event", "ProfessionAddedEvent");
    jsonEvent.put("professionId", command.getProfessionId().toString());
    jsonEvent.put("profession", command.getProfession());
    jsonEvent.put("userId", command.getUserId().toString());
    apply(new ProfessionAddedEvent(command.getProfessionId(), command.getProfession(), command.getUserId())).
            andThenApply(()-> jsonEvent);
}

which I handle in two different services as such

@EventHandler
public void on(JSONObject event) {
    System.out.println(event);
    switch (event.getString("event")){
        case "ProfessionAddedEvent":
            FutureCallback<AddProfessionCommand, String> callback = new FutureCallback<>();
            commandGateway.send(new AddProfessionCommand(new ProfessionId(event.getString("professionId")),
                    event.getString("profession"), new UserId(event.getString("userId"))));

The problem is that in one of the services, I have multiple queues listening to events of type org.Json.JSONObject.

@Bean
public SpringAMQPMessageSource cmsEvents(Serializer serializer)
{
    return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)){

        @Transactional
        @RabbitListener(queues = {"CityQueue", "ProfessionQueue", "AddressQueue", "IncomeQueue"})
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            super.onMessage(message, channel);
        }
    };
}

So when the event comes, it is handled by the handler for one of the queues, then for every other queue which received a JSONObject event. And for some reason the command above (AddProfessionCommand) sends and event with exactly the same Id which generates the error below that keeps running infinitely:

Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry ‘33e7d070-e433-47f9-97c1-f986ea075f78-0’ for key 'UK8s1f994p4la2ipb13me2xqm

My question is: Is there a way to send an event to a specific queue over rabbit or make a handler handle events only from a spoecific queue? Thanks

Hi Harvey,

first thing I wonder, is why you apply two events, one with JSON and one plain Java. They seem to represent exactly the same “event”. Consider using a serializer to customize how your events is publishes to the AMQP Queue instead. Note that you can configure different serializers in different locations.

In AMQP, you don’t publish to queues, but to Exchanges. Based on the routing key of a message (Axon defaults that to the package name of the event), messages are then delivered to queues that have been bound to that Exchange. Check out the documentation of your AMQP Message Broker which types of exchanges they support and how you should bind queues to them.

Regarding the duplicate key exception, check which table is causing this. If it is the Event Store table, you’re most likely attempting to publish the same event more than once.

Cheers,

Allard

Hi Harvey,

One other thing I noticed is that in the event handler, you dispatch a command to create the same aggregate again, which to me sounds like a loop and would explain the symptoms you describe: the duplicate ID, and the fact that it keeps running infinitely.

If the event handler is called, it will dispatch the command, which will raise another JSON event, which will be handled by the event handler, etc., etc., all for the same aggregate ID.

Why do you dispatch that command in the event handler?

Cheers, Oscar

Hi Oscar,
That event handler is found in a different service. It reads the JSON object from Rabbit and updates its own model. The command dispatched here doesn’t raise another JSON event.

Cheers.

Hi Harvey,

first thing I wonder, is why you apply two events, one with JSON and one plain Java. They seem to represent exactly the same “event”. Consider using a serializer to customize how your events is publishes to the AMQP Queue instead. Note that you can configure different serializers in different locations.

Sorry but how do I configure a serializer?

Another question.

@Bean
public Binding testBinding() {
    return BindingBuilder.bind(testQueue()).to(exchange()).with("*.TestEvent").noargs();
}

The above is supposed to bind the queue to the routing key that ends with TestEvent I guess. But each time an event is sent, the handler in the other service always handles it. I read the rabbit tutorials and changed the * to #, I made sure I disabled the different routing keys for the different queues and it still does the same.

Now for a queue with no routing key and bound to a topic exchange such as the one below now receives no event at all:

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(exchange()).with("#").noargs();
}

Am I missing something here?

Hi Harvey,

you can configure a serializer by just adding one to the Spring Context:

@Bean
public Serializer serializer() {
return new … choose…
}

However, this will change the serializer used by all components. It’s likely that you have events stored in your event store already (if you’re in production). Changing the serializer will make these unreadable.

What you can do instead, is define an SpringAMQPPublisher Bean in your context. That’s the type of bean created automatically when you specify axon.amqp.exchange. So when you define this bean, you’d want to remove this property. The SpringAMQPPublisher uses an AMQPMessageConverter to convert Axon messages to AMQP messages (they have their own structure). The default implementation uses a serializer to write the payload of the events as a byte[].

You can choose to fully customize how events are sent via AMQP, or use the default AMQPMessageConverter with a different serializer. Using the JacksonSerializer here would give you the JSON payload.
Be careful, though that you create this serializer inside the @Bean definitino of the SpringAMQPPublisher. If you define is as a separate bean, Axon will pick it up as the serializer for everything.

Regarding your questions about routing, I am not too familiar with the routing of messages to queues, though I believe the “.” is considered a separator and the # and * either match only a single part or multiple parts of your routing key. The documentation for RabbitMQ has some good explanations on how to route messages.

Hope this helps.
Cheers,

Allard