Event Processors Axon 4.5.4

Hello,
I issue an event from my command-service to RabbitMQ with @EventSourcingHandler in my aggregate and the following configuration:

@Configuration
public class AmqpEventPublicationConfiguration {

    @Value("${axon.amqp.exchange:undss.events}")
    String exchangeName;

    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.fanoutExchange(exchangeName).build();
    }

    @Bean
    public Queue queue(){
        return QueueBuilder.durable(exchangeName).build();
    }

    @Bean
    public Binding binding(Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("*").noargs();
    }

    @Autowired
    public void configure(AmqpAdmin amqpAdmin, Exchange exchange, Queue queue, Binding binding){
        amqpAdmin.declareExchange(exchange);
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareBinding(binding);
    }
}

I have a query-service with a listener that listens for messages and associates those messages to a processor with the @ProcessingGroup("amqpEvents") annotation and the following configuration.

@Configuration
public class AmqpConfiguration {

    @Bean
    public SpringAMQPMessageSource complaintEventsMethod(AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {

            @RabbitListener(queues = "${axon.amqp.exchange}")
            @Override
            public void onMessage(Message message, Channel channel) {
                super.onMessage(message, channel);
            }
        };
    }

    // Default all processors to subscribing mode.
    @Autowired
    public void configure(EventProcessingConfigurer config) {
        config.usingSubscribingEventProcessors();
    }

}

— application.yml

axon:
  amqp:
    exchange: undss.events
  eventhandling:
    processors:
      amqpEvents:
        source: complaintEventsMethod
        mode: subscribing

I can see that it goes into this eventhandler

    @EventHandler
    public void on(GenericDomainEventMessage genericDomainEventMessage ) {

and not this one

    @EventHandler
    public void on(CreatedGroupEvent createdGroupEvent) {

What this means is that I am sending aggregate type (GenericDomainEventMessage) events and not the type I would like (CreatedGroupEvent).

How can I configure this to receive events of type CreatedGroupEvent ?

Thank you very much

Firstly, welcome to the forum @acauchy!

Now to the problem at hand.
What strikes me first is the following:

I issue an event from my command-service to RabbitMQ with @EventSourcingHandler in my aggregate and the following configuration.

Not seeing this exact implementation makes this comment a little tricky, but the statement seems flawed to me. An @EventSourcingHandler annotated should only update state from the aggregate. Right now, it looks like your event sourcing handler dispatches events over RabbitMQ.

That shouldn’t be its job, honestly.
Instead, you would use Axon’s AMQP Extension as is described in the Reference Guide here. So in short, you’d configure the SpringAMQPPublisher to publish events from your application onto AMQP.

The fact you’re receiving a GenericDomainEventMessage, so Axon’s wrapper around events originating from an Aggregate is likely because your event publication approach isn’t using the SpringAMQPPublisher.

So, give that publisher a try. It should make sure you’d receive your events (so the CreatedGroupEvent) by default.

Although I’m unsure what you’ve replied to, do let us know whether the above helps!
If it doesn’t, I, or somebody else on the forum, will help you further.

Hello @Steven_van_Beelen
Thank you very much for the help and welcome :slight_smile:
I’m trying to learn a bit more before I start talking too much nonsense.
From what I understood the commandhandler is in charge of persisting through the repository and producing the events. Once that event is fired to the event-bus it can be retrieved through the eventhandler.
Right now in my query-service instead of retrieving a concrete event (CreateEventGroup) what I retrieve is a GenericEventMessage or something like that. I would like to retrieve a concrete event but I can’t do it.

service → command → command-bus → commandhandler → repository → load → event sourcing handler → commandhandler→ repository execute → commandhandler → event-bus → eventhandler ?

I have the following configuration code for amqp in my command service.

@Configuration
public class AmqpEventPublicationConfiguration {

    @Autowired
    private AMQPProperties amqpProperties;


    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Bean(initMethod = "start", destroyMethod = "shutDown")
    public SpringAMQPPublisher amqpBridge(
            EventBus eventBus,
            ConnectionFactory connectionFactory,
            AMQPMessageConverter amqpMessageConverter) {

        SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);



        // The rest is from axon-spring-autoconfigure...

        publisher.setExchangeName(amqpProperties.getExchange());
        publisher.setConnectionFactory(connectionFactory);
        publisher.setMessageConverter(amqpMessageConverter);
        switch (amqpProperties.getTransactionMode()) {

            case TRANSACTIONAL:
                publisher.setTransactional(true);
                break;
            case PUBLISHER_ACK:
                publisher.setWaitForPublisherAck(true);
                break;
            case NONE:
                break;
            default:
                throw new IllegalStateException("....");
        }

        return publisher;

    }

}
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {


    static boolean shouldSend (Class<?> pt) {
        return CreatedGroupEvent.class.isAssignableFrom(pt);
    }


    public SelectiveAmqpPublisher (
            SubscribableMessageSource<EventMessage<?>> messageSource) {

        super(messageSource);

    }


    @Override
    protected void send (List<? extends EventMessage<?>> events) {

        super.send(events.stream()
                .filter(e -> shouldSend(e.getPayloadType()))
                .collect(Collectors.toList()));

    }

}

And in my query-service I get a GenericEventMessage instead of a CreateGroupEvent

@Configuration
public class AmqpConfiguration {

    @Bean
    public SpringAMQPMessageSource myMessageSource(Serializer serializer) {
        return new SpringAMQPMessageSource(serializer) {
            @RabbitListener(queues = "${axon.amqp.exchange}")
            @Override
            public void onMessage(Message message, Channel channel) {
                super.onMessage(message, channel);
            }
        };
    }
}

What am I doing wrong :frowning:

Thank you very much

If I use the next Aggregate I recive in my query-service an GenericDomainEventMessage

@Aggregate
public class GroupAggregate {

    @AggregateIdentifier
    private String groupId;

    private String collectorType;

    private String name;

    private String description;

    public GroupAggregate() {}

    @CommandHandler
    public GroupAggregate(CreateGroupCommand createGroupCommand) {
        CreatedGroupEvent createdGroupEvent = new CreatedGroupEvent(createGroupCommand.getGroupId(), createGroupCommand.getName(), createGroupCommand.getDescription());
        apply(createdGroupEvent);

    }

    @EventSourcingHandler
    public void on(CreatedGroupEvent createdGroupEvent) {
        this.groupId = createdGroupEvent.getGroupId();
        this.name = createdGroupEvent.getName();
        this.description = createdGroupEvent.getDescription();
    }

    @CommandHandler
    public void handle(UpdateGroupCommand updateGroupCommand) {
        apply(new UpdatedGroupEvent(updateGroupCommand.getGroupId(), updateGroupCommand.getName(), updateGroupCommand.getDescription()));
    }

    @EventSourcingHandler
    public void on(UpdatedGroupEvent updatedGroupEvent) {
        this.name = updatedGroupEvent.getName();
        this.description = updatedGroupEvent.getDescription();
    }

    @CommandHandler
    public void handle(DeleteGroupCommand deleteGroupCommand) {
        apply(new DeletedGroupEvent(deleteGroupCommand.getGroupId()));
    }

    @EventSourcingHandler
    public void on(DeletedGroupEvent deletedGroupEvent) {
        markDeleted();
    }

}

Let me give some additional background to explain where I’m coming from with my answers.

Axon Framework will always wrap any message your application publishes in an Axon message.
Commands are thus wrapped in a CommandMessage, queries in a QueryMessage, query response in a QueryResponseMessage, and events in EventMessage instances.

We can further separate event messages in events published directly on the EventBus, EventMessage implementations, and typically the GenericEventMessage implementation. If the event originates from an Aggregate, you’re dealing with a DomainEventMessage, typically the GenericDomainEventMessage implementation. Axon differentiates between events originating from an Aggregate, as aggregate events require the aggregate identifier, aggregate sequence number, and aggregate type to construct Aggregate specific Event streams correctly.

Now, the framework thus wraps all messages you publish in a Message implementation. Nonetheless, you don’t (and typically shouldn’t) have to deal with the wrapper class at all. Instead, the framework injects your message into the methods. It does so with so-called “Parameter Resolvers.” These parameter resolvers resolve parameters for the message handling methods in your application.

These resolvers give you the flexibility to push any number of parameters in your handling functions. The reference guide has a specific page about the parameters that can be resolved for the message types. You can find that page [here](Supported Parameters for Annotated Handlers - Axon Reference Guide.

As you can see, the @EventHandler annotated methods will allow you to resolve the EventMessage as you’ve been doing. They should just as well resolve the event payload, however. In your sample, the event payload is the CreatedGroupEvent.

With all this behind us, I’d like to ask you to try a different approach to handling events. I want to rule out the AMQP Extension is the culprit here. So, if you can add an Event Handling Component that directly connects to the EventStore, can you share with us if the same problem persists? To do this, you only have to construct a class with a single @EventHandler without any additional configuration.

Hello,
Thank you very much for the information.
I think the problem is that we have not understood each other well.
What I want is to have an independent service for the commands ( group-cmd for example ) and another completely different one for the queries ( group-query for example ). The communication between these 2 services is done through rabbitMq.
group-cmd → rabbitMq → group-query
What I wanted concretely was that after the event has been saved in the event-storage, it would go to rabbitMq and then I would like to read it in an eventHandler on the side of the other service ( I think I have an object decoding problem too ).
In the end I think that for my solution I will not use framework because I am complicating things too much ( maybe I am the one who complicates them but in any case I have not done what I want for the moment ).