Is it possible to consume message from Rabbit using a StreamableMessageSource implementation? (Axon 3.4.2)

Hi everyone.

Actually I can consume message from rabbit and dispatch to eventHandlers using SpringAMQMessageSource using this configuration.

@Bean

`
SpringAMQPMessageSource springAMQPMessageSource(AMQPMessageConverter amqpMessageConverter) {

return new SpringAMQPMessageSource(amqpMessageConverter) {

@Override

@RabbitListener(queues = “${external.amqp.queue}”)

public void onMessage(Message message, Channel channel) {

log.info(“Received message: {}”, message);

log.info(“Channel {}”, channel);

super.onMessage(message, channel);

}

};

}

@Bean

@Scope(value = “prototype”)

public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {

return new DefaultAMQPMessageConverter(serializer);

}

@Autowired

public void configure(EventProcessingConfiguration configuration, SpringAMQPMessageSource springAMQPMessageSource) {

properties.getProcessors().forEach(p -> configuration.registerSubscribingEventProcessor(p, c -> springAMQPMessageSource));

`

I would like to use StreamableMessageSource in order to dispatch messages to eventHandlers. something like this

@Bean

`
StreamableMessageSource streamableMessageSource(AMQPMessageConverter amqpMessageConverter) {

@RabbitListener(queues = “${external.amqp.queue}”)

//Implementation

}

@Bean

@Scope(value = “prototype”)

public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {

return new DefaultAMQPMessageConverter(serializer);

}

@Autowired

public void configure(EventProcessingConfiguration configuration, SpringAMQPMessageSource springAMQPMessageSource) {

properties.getProcessors().forEach(p -> configuration.
registerTrackingProcessor(p, c -> springAMQPMessageSource));
}

`

Cheers.

}

Hi Martin,

The StreamableMessageSource requires the retrieval of a stream of events at a certain point in time, where the point in time is specified by the TrackingToken.
As far as my AMQP knowledge goes, it is not a persistent message queue.
Hence, you could never ask the AMQP solution for a stream of events from the beginning of time (the beginning of your specific application of course).
If you want to use a message broker as your StreamableMessageSource, you could have a look at the axon-kafka module.

Other than that, Axon Server is pretty good at taking all this configuration you’d be required to write out of your hands.
Thus, I’d suggest to take a look at that as well.

Cheers,
Steven