Hi everyone.
Actually I can consume message from rabbit and dispatch to eventHandlers using SpringAMQMessageSource using this configuration.
@Bean
`
SpringAMQPMessageSource springAMQPMessageSource(AMQPMessageConverter amqpMessageConverter) {
return new SpringAMQPMessageSource(amqpMessageConverter) {
@Override
@RabbitListener(queues = “${external.amqp.queue}”)
public void onMessage(Message message, Channel channel) {
log.info(“Received message: {}”, message);
log.info(“Channel {}”, channel);
super.onMessage(message, channel);
}
};
}
@Bean
@Scope(value = “prototype”)
public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
return new DefaultAMQPMessageConverter(serializer);
}
@Autowired
public void configure(EventProcessingConfiguration configuration, SpringAMQPMessageSource springAMQPMessageSource) {
properties.getProcessors().forEach(p -> configuration.registerSubscribingEventProcessor(p, c -> springAMQPMessageSource));
`
I would like to use StreamableMessageSource in order to dispatch messages to eventHandlers. something like this
@Bean
`
StreamableMessageSource streamableMessageSource(AMQPMessageConverter amqpMessageConverter) {
@RabbitListener(queues = “${external.amqp.queue}”)
//Implementation
}
@Bean
@Scope(value = “prototype”)
public AMQPMessageConverter amqpMessageConverter(Serializer serializer) {
return new DefaultAMQPMessageConverter(serializer);
}
@Autowired
public void configure(EventProcessingConfiguration configuration, SpringAMQPMessageSource springAMQPMessageSource) {
properties.getProcessors().forEach(p -> configuration.
registerTrackingProcessor(p, c -> springAMQPMessageSource));
}
`
Cheers.
}