The issue I’m facing is that when Kafka produces an unhandled deserialization error, it blocks the event handler’s processing. Below are some of the error logs I encountered:
org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventException: Cannot proceed with fetching ConsumerRecords since we encountered an exception
at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:96)
org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition XXXX at offset 623468. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)
My primary goal is to handle these errors on the consumer side in such a way that it allows the event processor to continue processing other valid events, avoiding a complete block in event handling. I came across an approach described in this article, but it involves using the Kafka API directly and introduces additional dependencies, which I’d like to avoid.
Has anyone encountered a similar situation? Are there Axon-provided solutions or best practices to address this, perhaps by configuring an error handler or bypassing problematic records without resorting to Kafka-specific APIs?
Any insights or suggestions would be greatly appreciated!
Here is the whole stacktrace, maybe it will be useful:
org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing VALUE for partition XXX at offset XXX. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.newRecordDeserializationException(CompletedFetch.java:347) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:285) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:169) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:135) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:146) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:699) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:623) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.9.0.jar:na]
at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.9.0.jar:4.9.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: io.cloudevents.rw.CloudEventRWException: Could not parse. Unknown encoding. Invalid content type or spec version
at io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException(CloudEventRWException.java:201) ~[cloudevents-api-2.4.0.jar:na]
at io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage(MessageUtils.java:80) ~[cloudevents-core-2.4.0.jar:na]
at io.cloudevents.kafka.KafkaMessageFactory.createReader(KafkaMessageFactory.java:65) ~[cloudevents-kafka-2.4.0.jar:na]
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:60) ~[cloudevents-kafka-2.4.0.jar:na]
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:34) ~[cloudevents-kafka-2.4.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:328) ~[kafka-clients-3.9.0.jar:na]
… 12 common frames omitted
I’ll think it’s best to raise an issue on the extension library with a proposal. It could be by setting a boolean property these types of errors are ignored, and/or you can provide a list of errors to ignore, and/or you can set een exception handler, in this case the exception handler could just be a consumer of Throwable I think. That last one would be the most flexible, and could have a spring boot property to just ignore all errors for example.
Thanks for the response! I’ll follow your suggestions. In the meantime, I’ve come up with the following workaround:
@Override
@Bean("axonKafkaConsumerFactory")
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
Map<String, Object> properties = customKafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializerDecorator.class);
return new DefaultConsumerFactory<>(properties);
}
Normally, the deserializer for CloudEvents is hardcoded, so I’ve overridden it here. Instead of propagating the deserialization exception, I log the error and send a dummy event, which is simply ignored. I’m still testing it, but here’s the deserializer implementation:
@Slf4j
public class CloudEventDeserializerDecorator implements Deserializer<CloudEvent> {
private final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();
@Override
public CloudEvent deserialize(String topic, Headers headers, ByteBuffer data) {
try {
return cloudEventDeserializer.deserialize(topic, headers, data);
} catch (Exception e) {
log.error("Skipping invalid CloudEvent for topic {}: {}", topic, e.getMessage());
return new CloudEventV1(
"dummy-id",
URI.create("dummy"),
"dummy",
"application/json",
null,
"dummy",
OffsetDateTime.now(),
null,
Map.of()
);
}
}
}
Having your own deserializer using the decorator pattern is indeed likely the easiest way now. I think you could even return just null in the decorator.