Configuring a Custom Deserializer for EventMessage Deserialization

I’ve configured a KafkaPublisher bean to use a custom KafkaMessageConverter implementation which uses an a custom Avro Serializer to (des)serialize events. This has been working without error so far, however, when try to add an avro field whose type is “long”, we start seeing XStream conversion errors. It’s as if it can’t convert from bytes to a long. See the stack trace at the bottom of the post. Adding the following Avro field causes the exception.

{
"name": "seenDate",
"type" :{ "type" : "long", "logicalType" : "timestamp-millis", "default": -1 
}

As you can see in the stack trace, LazyDeserializingObject is still configured to use an XStream Serializer. How can I configure the underlying implementation of EventMessage.getPayload() to use our Avro Serializer? We are relying on the schema class of the event message payload to determine what topic to send the message to, hence the call to EventMessage.getPayload(). We’re investigating other solutions to determining the topic in the meantime, but any advice is welcomed.


com.thoughtworks.xstream.converters.ConversionException:
---- Debugging information ----
cause-exception     : java.lang.ClassCastException
cause-message       : [B cannot be cast to java.lang.Byte
class               : <omitted>
required-type       : <omitted>
converter-type      : com.thoughtworks.xstream.converters.reflection.ExternalizableConverter
path                : <omitted>
line number         : 1
version             : 1.4.11.1
-------------------------------
            at com.thoughtworks.xstream.core.TreeUnmarshaller.convert(TreeUnmarshaller.java:77)
            at com.thoughtworks.xstream.core.AbstractReferenceUnmarshaller.convert(AbstractReferenceUnmarshaller.java:72)
            at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:66)
            at com.thoughtworks.xstream.core.TreeUnmarshaller.convertAnother(TreeUnmarshaller.java:50)
            at com.thoughtworks.xstream.core.TreeUnmarshaller.start(TreeUnmarshaller.java:134)
            at com.thoughtworks.xstream.core.AbstractTreeMarshallingStrategy.unmarshal(AbstractTreeMarshallingStrategy.java:32)
            at com.thoughtworks.xstream.XStream.unmarshal(XStream.java:1487)
            at com.thoughtworks.xstream.XStream.unmarshal(XStream.java:1467)
            at com.thoughtworks.xstream.XStream.fromXML(XStream.java:1338)
            at org.axonframework.serialization.xml.XStreamSerializer.doDeserialize(XStreamSerializer.java:123)
            at org.axonframework.serialization.AbstractXStreamSerializer.deserialize(AbstractXStreamSerializer.java:150)

            at org.axonframework.serialization.LazyDeserializingObject.getObject(LazyDeserializingObject.java:102)
            at org.axonframework.serialization.SerializedMessage.getPayload(SerializedMessage.java:77)
            at org.axonframework.messaging.MessageDecorator.getPayload(MessageDecorator.java:56)
            at
1 Like

Hi @blackcompe,

You can configure your serializers to use any Serializer you want - in your case, configuring it as the axon.serializer.events should do the trick for you. You can find more info here.

But, we are very much interested on your implementation of the AvroSerializer. Have you based it on our Serializer interface? If that is the case, it would be a valuable addition to the Framework and a PR for that would be awesome!

Hello,
I’m also interested in this topic.
I worked on two PoCs, one with Axon and one with Spring Boot - Kafka (event driven).
The idea with kafka is to initially use it as an event bus, sending different types of events (Avro schemas) into a single topic.
For the “projection” (read model) layer, I do a stream transformation from events topic to a new topic containing the final “state”.
It works very well with kafka and the schema registry and it takes care of the versioning problem.
I miss the CommandHandling that Axon offers.
What I would like to try next, it is an Axon integration with Kafka, using Axon for command handling (+ event sourcing and the possibility to create internal projection to the application without Kafka) and kafka as event bus with Avro schema registry.

Has anyone already done something similar?

Thank you

From AxonIQ’s stance, Kafka is intended as a message broadcast platform.
This makes it perfectly suitable for events, as events should be broadcast to any listener which is interested in them.

Commands however require a different form of routing.
Namely, a command should be targeted towards the exact model (aggregate) which is tasked with the command.
As such, you require a means to know the exact handler of a command to be able to route the message correctly.

This difference in messaging approach is reflected in the Extensions Axon provides.
AMQP and Kafka can be used for events, as they both fit that bill perfectly well.
JGroups and Spring Cloud Discovery are used as a discovery means, using another layer to actually sent the message.

For massive simplicity in your infrastructure, we have constructed Axon Server. It supports the specific routing requirements for commands, events, and queries, as well as doubling as the event store.

Anyhow, that’s the stance we take. I can’t vouch for others who might have tried to force Kafka into being a command router. It is most likely certainly doable, but there are better tools for the job if you ask me.

That’s my two cents; hope it sheds some light on your options.

1 Like