Snapshot failing to load

So I’m in the process of creating an Event Storage Engine and I have it working for normal event storage and token tracking.
But for snapshots I’m running into a weird situation.
The snap shot is being saved correctly:
PayloadType: c.w.i.p.s.d.r.RecordAggregate
Payload:

{
  "aggregateId": "a9f47892-ddf2-3ee0-89c2-ea84a0ce976e",
  "uniqueId": {
    "uniqueId": "blah1",
    "applicationId": "app-id-b",
    "contentType": "type-of-data"
  },
  "data": [
    {
      "context": {
        "locale": "en-US"
      },
      "fields": [
        {
          "key": "key6",
          "value": "testing",
          "source": "OWNER",
          "createDate": 1716896789.311281000,
          "updateDate": 1716964097.555495000
        },
        {
          "key": "key5",
          "value": 1002,
          "source": "OWNER",
          "createDate": 1716896789.311296000,
          "updateDate": 1716964097.555499000
        }
      ],
      "fieldKeys": [
        "key6",
        "key5"
      ]
    }
  ]
}

I tried using the Payload to manually deserialise back to a RecordAggregate and it works perfectly, but when the framework load the aggregate I’m getting an error:

org.axonframework.serialization.CannotConvertBetweenTypesException: Cannot build a converter to convert from org.axonframework.serialization.SimpleSerializedObject to [B
	at org.axonframework.serialization.ChainedConverter.calculateChain(ChainedConverter.java:59)
	at org.axonframework.serialization.ChainingConverter.convert(ChainingConverter.java:95)
	at org.axonframework.serialization.Converter.convert(Converter.java:71)
	at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:204)
	at org.axonframework.serialization.LazyDeserializingObject.getObject(LazyDeserializingObject.java:102)
	at org.axonframework.serialization.SerializedMessage.getPayload(SerializedMessage.java:79)
	at org.axonframework.messaging.MessageDecorator.getPayload(MessageDecorator.java:56)
	at org.axonframework.eventsourcing.AbstractAggregateFactory.fromSnapshot(AbstractAggregateFactory.java:97)
	at org.axonframework.eventsourcing.AbstractAggregateFactory.createAggregateRoot(AbstractAggregateFactory.java:91)
	at org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory.createAggregateRoot(SpringPrototypeAggregateFactory.java:148)
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadAggregate(EventSourcingRepository.java:151)
	at org.axonframework.eventsourcing.EventSourcingRepository.lambda$doLoadWithLock$0(EventSourcingRepository.java:138)
	at org.axonframework.tracing.Span.runSupplier(Span.java:163)
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock(EventSourcingRepository.java:138)
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock(EventSourcingRepository.java:56)
	at org.axonframework.modelling.command.LockingRepository.doLoadOrCreate(LockingRepository.java:153)
	at org.axonframework.modelling.command.LockingRepository.doLoadOrCreate(LockingRepository.java:60)
	at org.axonframework.modelling.command.AbstractRepository.lambda$loadOrCreate$8(AbstractRepository.java:176)
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
	at org.axonframework.modelling.command.AbstractRepository.loadOrCreate(AbstractRepository.java:172)
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCreateOrUpdateCommandHandler.handle(AggregateAnnotationCommandHandler.java:486)
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCreateOrUpdateCommandHandler.handle(AggregateAnnotationCommandHandler.java:469)
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57)
	at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67)
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55)
	at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.executeWithResult(DefaultUnitOfWork.java:77)
	at org.axonframework.commandhandling.SimpleCommandBus.lambda$handle$2(SimpleCommandBus.java:200)
	at org.axonframework.tracing.Span.runSupplier(Span.java:163)
	at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:191)
	at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:165)
	at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch$1(SimpleCommandBus.java:131)
	at org.axonframework.tracing.Span.run(Span.java:101)
	at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
	at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
	at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
	at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:138)
	at c.w.i.p.s.application.service.StorageService.store(StorageService.java:17)
	at c.w.i.p.s.adapter.in.web.StorageController.storeRecord(StorageController.java:45)
	at c.w.i.p.s.adapter.in.web.StorageController$$FastClassBySpringCGLIB$$b7b0a186.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:123)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
	at c.w.i.p.s.adapter.in.web.StorageController$$EnhancerBySpringCGLIB$$2ff9d184.storeRecord(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:145)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
	at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:258)
	at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:347)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:663)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:274)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)

During debugging I can see that the SimpleSerializedObject is type correctly to RecordAggregate and has the correct payload in it.

Not sure what I’ve messed up here.

This is my Axon configuration:

@Configuration
public class AxonConfig {
  // The EmbeddedEventStore delegates actual storage and retrieval of events to an EventStorageEngine.
  @Bean
  public EventStore eventStore(EventStorageEngine storageEngine) {
    return EmbeddedEventStore
        .builder()
        .storageEngine(storageEngine)
        .build();
  }

  // The SpannerEventStorageEngine stores each event as a row in a Spanner table.
  @Bean
  public EventStorageEngine storageEngine(
      JacksonSerializer serializer,
      DomainEventEntryRepository domainEventEntryRepository,
      SnapshotEventEntryRepository snapshotEventEntryRepository,
      TransactionManager transactionManager,
      SpannerTemplate spannerTemplate
  ) {
    return SpannerEventStorageEngine
        .builder()
        .eventSerializer(serializer)
        .snapshotSerializer(serializer)
        .spannerTemplate(spannerTemplate)
        .domainEventEntryRepository(domainEventEntryRepository)
        .snapshotEventEntryRepository(snapshotEventEntryRepository)
        .transactionManager(transactionManager)
        .build();
  }

  @Bean
  public TokenStore tokenStore(
      JacksonSerializer serializer,
      TokenEntryRepository repository,
      SpannerTemplate spannerTemplate
  ) {
    return SpannerTokenStore
        .builder()
        .serializer(serializer)
        .spannerTemplate(spannerTemplate)
        .tokenEntryRepository(repository)
        .build();
  }

  @Bean
  public SnapshotTriggerDefinition recordSnapshotTrigger(Snapshotter snapshotter) {
    return new EventCountSnapshotTriggerDefinition(snapshotter, 5);
  }

  @Bean
  @Primary
  public Serializer serializer(ObjectMapper objectMapper) {
    return JacksonSerializer.builder().objectMapper(objectMapper).build();
  }

  @Bean
  public SpannerEntityProcessor spannerConverter(SpannerMappingContext mappingContext) {
    return new ConverterAwareMappingSpannerEntityProcessor(
        mappingContext,
        List.of(
            new OffsetDateTimeToTimestampConverter(),
            new SimpleSerializedObjectToByteArrayConverter()
        ),
        List.of(
            new TimestampToOffsetDateTimeConverter(),
            new SimpleSerializedObjectToByteArrayConverter()
        )
    );
  }
}

Can you also share the aggregate? I think it might have to do with Jackson serialization. Likely the content in the payload, can’t be set on the aggregate. There are multiple ways to do this with jackson.

This is my aggregate.

package com.wayfair.i18n.platform.mediator.domain.record;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.wayfair.i18n.platform.mediator.common.serializer.RecordAggregateSerializer;
import com.wayfair.i18n.platform.mediator.domain.record.record.RecordData;
import com.wayfair.i18n.platform.mediator.domain.record.record.RecordField;
import com.wayfair.i18n.platform.mediator.domain.record.record.RecordLocalizationContext;
import com.wayfair.i18n.platform.mediator.domain.record.record.RecordUniqueId;
import com.wayfair.i18n.platform.mediator.domain.record.command.StoreRecordCommand;
import com.wayfair.i18n.platform.mediator.domain.record.event.RecordCreatedEvent;
import com.wayfair.i18n.platform.mediator.domain.record.event.RecordDataCreatedEvent;
import com.wayfair.i18n.platform.mediator.domain.record.event.RecordDataUpdatedEvent;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateCreationPolicy;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.CreationPolicy;
import org.axonframework.spring.stereotype.Aggregate;

@Aggregate(snapshotTriggerDefinition = "recordSnapshotTrigger")
@Getter
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
@JsonSerialize(using = RecordAggregateSerializer.class)
public class RecordAggregate implements Serializable {
  @AggregateIdentifier
  @EqualsAndHashCode.Include
  private String aggregateId;
  private RecordUniqueId uniqueId;
  private Map<RecordLocalizationContext, RecordData> data;

  @JsonCreator
  public RecordAggregate(
      @JsonProperty("aggregateId") String aggregateId,
      @JsonProperty("uniqueId") RecordUniqueId uniqueId,
      @JsonProperty("data") List<RecordData> data
  ) {
    this.aggregateId = aggregateId;
    this.uniqueId = uniqueId;

    this.data = new HashMap<>();
    data.forEach(recordData -> this.data.put(recordData.getContext(), recordData));
  }

  public static String generateIdentifier(RecordUniqueId uniqueId) {
    return UUID
        .nameUUIDFromBytes(
            "%s|%s|%s"
                .formatted(
                    uniqueId.getApplicationId().getId(),
                    uniqueId.getContentType().getValue(),
                    uniqueId.getUniqueId()
                )
                .getBytes()
        )
        .toString();
  }

  @CommandHandler
  @CreationPolicy(AggregateCreationPolicy.CREATE_IF_MISSING)
  public void on(StoreRecordCommand command) {
    if (this.uniqueId == null) {
      apply(new RecordCreatedEvent(command.getUniqueId()));
      apply(new RecordDataCreatedEvent(command.getUniqueId(), command.getData()));

      return;
    }

    calculateCreatesAndUpdates(command);
  }

  private void calculateCreatesAndUpdates(StoreRecordCommand command) {
    List<RecordData> createdData = new ArrayList<>();
    List<RecordData> updatedData = new ArrayList<>();

    command.getData().forEach(
        recordData -> {
          var context = recordData.getContext();
          var contextData = data.get(context);

          List<RecordField> created = new ArrayList<>();
          List<RecordField> updated = new ArrayList<>();
          recordData.getFields().forEach(
              field -> {
                if (contextData != null && contextData.containsKey(field.getKey())) {
                  updated.add(field);

                  return;
                }

                created.add(field);
              }
          );

          if (!created.isEmpty()) {
            createdData.add(new RecordData(context, created));
          }

          if (!updated.isEmpty()) {
            updatedData.add(new RecordData(context, updated));
          }
        }
    );

    if (!createdData.isEmpty()) {
      apply(new RecordDataCreatedEvent(command.getUniqueId(), createdData));
    }

    if (!updatedData.isEmpty()) {
      apply(new RecordDataUpdatedEvent(command.getUniqueId(), updatedData));
    }
  }

  @EventSourcingHandler
  private void on(RecordCreatedEvent event) {
    this.uniqueId = event.uniqueId();
    this.aggregateId = RecordAggregate.generateIdentifier(this.uniqueId);
    this.data = new HashMap<>();
  }

  @EventSourcingHandler
  private void on(RecordDataCreatedEvent event) {
    event.data().forEach(recordData -> this.data.put(
            recordData.getContext(),
            // Rebuild to generate field keys
            new RecordData(recordData.getContext(), recordData.getFields())
        )
    );
  }

  @EventSourcingHandler
  private void on(RecordDataUpdatedEvent event) {
    event.data().forEach(recordData -> {
          var context = recordData.getContext();

          data.put(
              context,
              data.get(context).updateFields(recordData.getFields())
          );
        }
    );
  }
}

I’ve tested copying the json from the snapshot store and deserialising using Jackson.
It worked well.

It might be because there is no type information in the payload. It might work if you enable default typing. I’m not sure though.

I’ve tested deserialising the snapshot payload using the same ObjectMapper that is being used by the JacksonSerializer.

Wouldn’t that mean that the framework, using Jackson, should also be able to do
it?

Not necessarily; sometimes, it needs the type information, like in lists returned from a query. I’m not sure this is another such case, as it already has the type. It can hurt to call .defaultTyping() on the JacksonSerializer and see if that solves the problem?

I will give it a try. It might take a while, but I will let you know if it worked.

1 Like