So I’m in the process of creating an Event Storage Engine and I have it working for normal event storage and token tracking.
But for snapshots I’m running into a weird situation.
The snap shot is being saved correctly:
PayloadType: c.w.i.p.s.d.r.RecordAggregate
Payload:
{
"aggregateId": "a9f47892-ddf2-3ee0-89c2-ea84a0ce976e",
"uniqueId": {
"uniqueId": "blah1",
"applicationId": "app-id-b",
"contentType": "type-of-data"
},
"data": [
{
"context": {
"locale": "en-US"
},
"fields": [
{
"key": "key6",
"value": "testing",
"source": "OWNER",
"createDate": 1716896789.311281000,
"updateDate": 1716964097.555495000
},
{
"key": "key5",
"value": 1002,
"source": "OWNER",
"createDate": 1716896789.311296000,
"updateDate": 1716964097.555499000
}
],
"fieldKeys": [
"key6",
"key5"
]
}
]
}
I tried using the Payload to manually deserialise back to a RecordAggregate and it works perfectly, but when the framework load the aggregate I’m getting an error:
org.axonframework.serialization.CannotConvertBetweenTypesException: Cannot build a converter to convert from org.axonframework.serialization.SimpleSerializedObject to [B
at org.axonframework.serialization.ChainedConverter.calculateChain(ChainedConverter.java:59)
at org.axonframework.serialization.ChainingConverter.convert(ChainingConverter.java:95)
at org.axonframework.serialization.Converter.convert(Converter.java:71)
at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:204)
at org.axonframework.serialization.LazyDeserializingObject.getObject(LazyDeserializingObject.java:102)
at org.axonframework.serialization.SerializedMessage.getPayload(SerializedMessage.java:79)
at org.axonframework.messaging.MessageDecorator.getPayload(MessageDecorator.java:56)
at org.axonframework.eventsourcing.AbstractAggregateFactory.fromSnapshot(AbstractAggregateFactory.java:97)
at org.axonframework.eventsourcing.AbstractAggregateFactory.createAggregateRoot(AbstractAggregateFactory.java:91)
at org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory.createAggregateRoot(SpringPrototypeAggregateFactory.java:148)
at org.axonframework.eventsourcing.EventSourcingRepository.doLoadAggregate(EventSourcingRepository.java:151)
at org.axonframework.eventsourcing.EventSourcingRepository.lambda$doLoadWithLock$0(EventSourcingRepository.java:138)
at org.axonframework.tracing.Span.runSupplier(Span.java:163)
at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock(EventSourcingRepository.java:138)
at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock(EventSourcingRepository.java:56)
at org.axonframework.modelling.command.LockingRepository.doLoadOrCreate(LockingRepository.java:153)
at org.axonframework.modelling.command.LockingRepository.doLoadOrCreate(LockingRepository.java:60)
at org.axonframework.modelling.command.AbstractRepository.lambda$loadOrCreate$8(AbstractRepository.java:176)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
at org.axonframework.modelling.command.AbstractRepository.loadOrCreate(AbstractRepository.java:172)
at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCreateOrUpdateCommandHandler.handle(AggregateAnnotationCommandHandler.java:486)
at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCreateOrUpdateCommandHandler.handle(AggregateAnnotationCommandHandler.java:469)
at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57)
at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67)
at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55)
at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.executeWithResult(DefaultUnitOfWork.java:77)
at org.axonframework.commandhandling.SimpleCommandBus.lambda$handle$2(SimpleCommandBus.java:200)
at org.axonframework.tracing.Span.runSupplier(Span.java:163)
at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:191)
at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:165)
at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch$1(SimpleCommandBus.java:131)
at org.axonframework.tracing.Span.run(Span.java:101)
at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:138)
at c.w.i.p.s.application.service.StorageService.store(StorageService.java:17)
at c.w.i.p.s.adapter.in.web.StorageController.storeRecord(StorageController.java:45)
at c.w.i.p.s.adapter.in.web.StorageController$$FastClassBySpringCGLIB$$b7b0a186.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:123)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at c.w.i.p.s.adapter.in.web.StorageController$$EnhancerBySpringCGLIB$$2ff9d184.storeRecord(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:145)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:258)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:347)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:663)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:274)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
During debugging I can see that the SimpleSerializedObject is type correctly to RecordAggregate and has the correct payload in it.
Not sure what I’ve messed up here.
This is my Axon configuration:
@Configuration
public class AxonConfig {
// The EmbeddedEventStore delegates actual storage and retrieval of events to an EventStorageEngine.
@Bean
public EventStore eventStore(EventStorageEngine storageEngine) {
return EmbeddedEventStore
.builder()
.storageEngine(storageEngine)
.build();
}
// The SpannerEventStorageEngine stores each event as a row in a Spanner table.
@Bean
public EventStorageEngine storageEngine(
JacksonSerializer serializer,
DomainEventEntryRepository domainEventEntryRepository,
SnapshotEventEntryRepository snapshotEventEntryRepository,
TransactionManager transactionManager,
SpannerTemplate spannerTemplate
) {
return SpannerEventStorageEngine
.builder()
.eventSerializer(serializer)
.snapshotSerializer(serializer)
.spannerTemplate(spannerTemplate)
.domainEventEntryRepository(domainEventEntryRepository)
.snapshotEventEntryRepository(snapshotEventEntryRepository)
.transactionManager(transactionManager)
.build();
}
@Bean
public TokenStore tokenStore(
JacksonSerializer serializer,
TokenEntryRepository repository,
SpannerTemplate spannerTemplate
) {
return SpannerTokenStore
.builder()
.serializer(serializer)
.spannerTemplate(spannerTemplate)
.tokenEntryRepository(repository)
.build();
}
@Bean
public SnapshotTriggerDefinition recordSnapshotTrigger(Snapshotter snapshotter) {
return new EventCountSnapshotTriggerDefinition(snapshotter, 5);
}
@Bean
@Primary
public Serializer serializer(ObjectMapper objectMapper) {
return JacksonSerializer.builder().objectMapper(objectMapper).build();
}
@Bean
public SpannerEntityProcessor spannerConverter(SpannerMappingContext mappingContext) {
return new ConverterAwareMappingSpannerEntityProcessor(
mappingContext,
List.of(
new OffsetDateTimeToTimestampConverter(),
new SimpleSerializedObjectToByteArrayConverter()
),
List.of(
new TimestampToOffsetDateTimeConverter(),
new SimpleSerializedObjectToByteArrayConverter()
)
);
}
}