RabbitMQ Listener starts before the eventProcessors have subscribed

Hi everyone

I use Axon with Spring AMQP to write the events to a RabbitMQ and read from it.
I have the following configuration:

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }


    @Bean
    Queue queue() {
        return new Queue("myQueue", true);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("#");
    }


    @Bean
    public SpringAMQPMessageSource profileServiceSource(AMQPMessageConverter messageConverter) {

        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "myQueue")
            @Override
            public void onMessage(Message message, Channel channel) {
                super.onMessage(message, channel);
            }
        };
    }

The problem is, at application startup, the RabbitListener runs before the eventProcessors have subscribed to the SpringAMQPMessageSource. As a result, the first events after startup are simply ignored.

I suspect this is a bug in my configuration.
Does anyone have any ideas what this could be??

Hi @Tino,

I don’t think you’re facing a bug necessarily, but more so a configuration predicament.
To be frank with you, I don’t use RabbitMQ terrible often.
However, I am pretty certain there’s a means to have a RabbitListener annotated method wait before it is able to receive messages from the configured queue(s).

A quick google let me to find the autoStart property you can define on the annotation.
Perhaps setting that to false, and enabling it once the Axon Framework configuration is done works?

By the way, if you want to tap into the point when AF config is done, you can listen to the AxonStartedEvent Spring Event. To listen to Spring events, you need to add an @EventListener annotated method (instead of Axon Framework’s @EventHandler annotated method.

I hope the above helps you further, Tino!
If not, be sure to reach out.

Thank you for your answer, Steven.
That was now also my solution approach. I was just unsure if this is the intended way or if I am missing something.

Thanks for coming back to us here, Tino!
Seeing both your question and your approach will be valuable for future readers of this thread.

Same issue occurs when using Spring Cloud Stream bindings. Solution for this is disable auto startup and manually start bindings on event.

application.yaml

spring:
  cloud:
    stream:
      default:
        consumer:
          autoStartup: false

Axon event listener:

@Component
class AxonReadyListener(private val bindingsEndpoint: BindingsEndpoint) {
  private val log = KotlinLogging.logger {}

  @EventListener(AxonStartedEvent::class)
  fun ready() {
    log.info { "Axon started. Starting bindings..." }
    bindingsEndpoint.queryStates().forEach {
      val bindings = it.filterKeys { k -> k === "bindingName" }
      bindings.forEach { b ->
        bindingsEndpoint.changeState(b.value as String, BindingsLifecycleController.State.STARTED)
      }
    }
  }
}