Handling Deserialization Exceptions in Axon Kafka Integration

Hello everyone,

The issue I’m facing is that when Kafka produces an unhandled deserialization error, it blocks the event handler’s processing. Below are some of the error logs I encountered:

org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventException: Cannot proceed with fetching ConsumerRecords since we encountered an exception 
    at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:96)
org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition XXXX at offset 623468. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)

My primary goal is to handle these errors on the consumer side in such a way that it allows the event processor to continue processing other valid events, avoiding a complete block in event handling. I came across an approach described in this article, but it involves using the Kafka API directly and introduces additional dependencies, which I’d like to avoid.

Has anyone encountered a similar situation? Are there Axon-provided solutions or best practices to address this, perhaps by configuring an error handler or bypassing problematic records without resorting to Kafka-specific APIs?

Any insights or suggestions would be greatly appreciated!

Here is the whole stacktrace, maybe it will be useful:

org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing VALUE for partition XXX at offset XXX. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.newRecordDeserializationException(CompletedFetch.java:347) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:285) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:169) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:135) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:146) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:699) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:623) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.9.0.jar:na]
at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.9.0.jar:4.9.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: io.cloudevents.rw.CloudEventRWException: Could not parse. Unknown encoding. Invalid content type or spec version
at io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException(CloudEventRWException.java:201) ~[cloudevents-api-2.4.0.jar:na]
at io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage(MessageUtils.java:80) ~[cloudevents-core-2.4.0.jar:na]
at io.cloudevents.kafka.KafkaMessageFactory.createReader(KafkaMessageFactory.java:65) ~[cloudevents-kafka-2.4.0.jar:na]
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:60) ~[cloudevents-kafka-2.4.0.jar:na]
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:34) ~[cloudevents-kafka-2.4.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.9.0.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:328) ~[kafka-clients-3.9.0.jar:na]
… 12 common frames omitted

I’ll think it’s best to raise an issue on the extension library with a proposal. It could be by setting a boolean property these types of errors are ignored, and/or you can provide a list of errors to ignore, and/or you can set een exception handler, in this case the exception handler could just be a consumer of Throwable I think. That last one would be the most flexible, and could have a spring boot property to just ignore all errors for example.

1 Like

Thanks for the response! I’ll follow your suggestions. In the meantime, I’ve come up with the following workaround:

  @Override
  @Bean("axonKafkaConsumerFactory")
  public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    Map<String, Object> properties = customKafkaProperties.buildConsumerProperties();
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializerDecorator.class);

    return new DefaultConsumerFactory<>(properties);
  }

Normally, the deserializer for CloudEvents is hardcoded, so I’ve overridden it here. Instead of propagating the deserialization exception, I log the error and send a dummy event, which is simply ignored. I’m still testing it, but here’s the deserializer implementation:

@Slf4j
public class CloudEventDeserializerDecorator implements Deserializer<CloudEvent> {

  private final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();


  @Override
  public CloudEvent deserialize(String topic, Headers headers, ByteBuffer data) {
    try {
      return cloudEventDeserializer.deserialize(topic, headers, data);
    } catch (Exception e) {
      log.error("Skipping invalid CloudEvent for topic {}: {}", topic, e.getMessage());
      return new CloudEventV1(
          "dummy-id",
          URI.create("dummy"),
          "dummy",
          "application/json",
          null,
          "dummy",
          OffsetDateTime.now(),
          null,
          Map.of()
      );
    }
  }
}

Having your own deserializer using the decorator pattern is indeed likely the easiest way now. I think you could even return just null in the decorator.

1 Like