Handling Deserialization Exceptions in Axon Kafka Integration

Thanks for the response! I’ll follow your suggestions. In the meantime, I’ve come up with the following workaround:

  @Override
  @Bean("axonKafkaConsumerFactory")
  public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    Map<String, Object> properties = customKafkaProperties.buildConsumerProperties();
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializerDecorator.class);

    return new DefaultConsumerFactory<>(properties);
  }

Normally, the deserializer for CloudEvents is hardcoded, so I’ve overridden it here. Instead of propagating the deserialization exception, I log the error and send a dummy event, which is simply ignored. I’m still testing it, but here’s the deserializer implementation:

@Slf4j
public class CloudEventDeserializerDecorator implements Deserializer<CloudEvent> {

  private final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();


  @Override
  public CloudEvent deserialize(String topic, Headers headers, ByteBuffer data) {
    try {
      return cloudEventDeserializer.deserialize(topic, headers, data);
    } catch (Exception e) {
      log.error("Skipping invalid CloudEvent for topic {}: {}", topic, e.getMessage());
      return new CloudEventV1(
          "dummy-id",
          URI.create("dummy"),
          "dummy",
          "application/json",
          null,
          "dummy",
          OffsetDateTime.now(),
          null,
          Map.of()
      );
    }
  }
}
1 Like