And subscription queries as well, since they would only ‘see’ the responses from the events that hit the instance they are connected to.
Ah, we hadn’t considered that - thanks for highlighting that important aspect. Then we’ll likely need to rework our current approach a bit .
[…] and a local query bus.
I am currently testing this approach and have met some unexpected behaviour that I can’t quite explain.
If I do not autowire a QueryBus
or QueryGateway
bean, I am able to process records and have them be forwarded to my event handler (though it currently accepts the “wildcard” Message<?>
as parameter for the technical proof).
If I autowire a QueryBus
or QueryGateway
however, it seems that a TrackingEventProcessor
is set up somewhere that prevents me from consuming any records at all.
The configuration I have looks as follows:
AxonAvroKafkaConverter
supporting only deserialization (we do not need to produce on the consumer side)
public class AvroKafkaMessageConverter implements KafkaMessageConverter<UUID, byte[]> {
private final KafkaAvroDeserializer deserializer;
private final String topic;
public AvroKafkaMessageConverter(String topic, KafkaAvroDeserializer deserializer) {
this.topic = Objects.requireNonNull(topic);
this.deserializer = Objects.requireNonNull(deserializer);
}
@Override
public ProducerRecord<UUID, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
throw new UnsupportedOperationException();
}
@Override
public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<UUID, byte[]> consumerRecord) {
return Optional.of(
GenericEventMessage.asEventMessage(
deserializer.deserialize(topic, consumerRecord.headers(), consumerRecord.value())));
}
}
SerdeConfig
that exposes the above Converter as a bean
@Configuration
public class SerdeConfig {
@Bean
public KafkaMessageConverter<UUID, byte[]> kafkaMessageConverter() { //KafkaProperties axonKafkaProperties) {
KafkaAvroDeserializer deserializerDelegate = new KafkaAvroDeserializer();
Map<String, Object> properties = new HashMap<>();
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
properties.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
properties.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "user:user");
properties.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);
deserializerDelegate.configure(properties, false);
return new AvroKafkaMessageConverter("MY_TOPIC", deserializerDelegate);
}
}
AxonKafkaConfig
which sets up the relevant axon-kafka beans
@Configuration
public class AxonKafkaConfig {
@Autowired
private EventProcessingConfigurer eventProcessingConfigurer;
private String groupId = "my-api";
private String processorName = "myProcessor";
@Bean
public KafkaAvroDeserializer kafkaAvroDeserializer() {
return new KafkaAvroDeserializer();
}
@Bean
public ConsumerFactory<UUID, byte[]> consumerFactory(KafkaProperties consumerConfiguration) {
return new DefaultConsumerFactory<>(consumerConfiguration.buildConsumerProperties());
}
@Bean
public Fetcher<?, ?, ?> fetcher() {
return AsyncFetcher.builder()
.build();
}
@Bean
public SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource(
ConsumerFactory<UUID, byte[]> consumerFactory,
Fetcher<UUID, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<UUID, byte[]> messageConverter,
Configurer configurer
) {
SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<UUID, byte[]>builder()
.topics(List.of("MY_TOPIC"))
.groupId(groupId) // Hard requirement
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter)
// .serializer() TODO xstream no security context exception
.build();
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
}
}
AxonEventProcessorConfig
which registers the subscribing event processor
@Configuration
public class AxonEventProcessorConfig {
private String groupId = "my-api";
private String processorName = "myProcessor";
@Autowired
EventProcessingConfigurer eventProcessingConfigurer;
@Autowired
SubscribableKafkaMessageSource<UUID, byte[]> subscribableKafkaMessageSource;
@PostConstruct
public void postConstruct() {
// eventProcessingConfigurer.usingSubscribingEventProcessors();
eventProcessingConfigurer.assignProcessingGroup(groupId, processorName);
eventProcessingConfigurer.registerSubscribingEventProcessor(
processorName,
configuration -> subscribableKafkaMessageSource
);
}
}
And an event handler as follows:
@Slf4j
@Component
@ProcessingGroup("myProcessor")
public class Handler {
// @Autowired
// private QueryGateway queryGateway;
// @Autowired
// private QueryBus queryBus;
// @Autowired
// private QueryUpdateEmitter emitter;
@EventHandler
public void on(Message<?> eventMessage){
log.info("Got event: {}", eventMessage);
}
}
If I run this as is, my events are correctly consumed.
However, if I do autowire the query components, I get the following log output:
2023-12-12T14:40:43.698+01:00 INFO 792710 --- [ main] d.v.c.a.my.MyApplication : Started MyApplication in 5.967 seconds (process running for 6.179)
2023-12-12T14:40:44.066+01:00 INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/0] for processing
2023-12-12T14:40:44.079+01:00 INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=SampleProcessor, segment=Segment[0/0]}
2023-12-12T14:40:44.085+01:00 INFO 792710 --- [pleProcessor]-0] o.a.e.TrackingEventProcessor : Fetched token: null for segment: Segment[0/0]
Most notably, it doesn’t seem like a Kafka Consumer is started at all. Additionally, since I don’t have a TokenStore
, I’m rather surprised that it did not give me an exception. Regardless, nothing seems to happen further.
If I uncomment the eventProcessingConfigurer.usingSubscribingEventProcessors();
in AxonEventProcessorConfig#postConstruct
, I don’t get the above TrackingEventProcessor
logs, but nothing seems to happen. The Kafka Consumer is not created either in this case.
This seems like retrieving any query components has an associated init phase/handler that sets up other configurations that might conflict with the axon-kafka extension to some degree.
Do you potentially have any idea why this behaviour occurs? If it helps, I can also provide startup logs. I’ll be going over them now to try and find the discrepancy…