Thanks for the response! I’ll follow your suggestions. In the meantime, I’ve come up with the following workaround:
@Override
@Bean("axonKafkaConsumerFactory")
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
Map<String, Object> properties = customKafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializerDecorator.class);
return new DefaultConsumerFactory<>(properties);
}
Normally, the deserializer for CloudEvents is hardcoded, so I’ve overridden it here. Instead of propagating the deserialization exception, I log the error and send a dummy event, which is simply ignored. I’m still testing it, but here’s the deserializer implementation:
@Slf4j
public class CloudEventDeserializerDecorator implements Deserializer<CloudEvent> {
private final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();
@Override
public CloudEvent deserialize(String topic, Headers headers, ByteBuffer data) {
try {
return cloudEventDeserializer.deserialize(topic, headers, data);
} catch (Exception e) {
log.error("Skipping invalid CloudEvent for topic {}: {}", topic, e.getMessage());
return new CloudEventV1(
"dummy-id",
URI.create("dummy"),
"dummy",
"application/json",
null,
"dummy",
OffsetDateTime.now(),
null,
Map.of()
);
}
}
}