Snapshotting and revision numbers

So I’m not entirely sure this is a feature request, bug or just a question. Let’s see!

We’re making use of Axon Framework and Axon Server 4.5.1. We’ve enabled aggretate snapshotting every 1000 events, and our snapshots have a revision number.

The other day I made a mistake by changing the package of our aggregate (while keeping its name the same) without changing the aggregate revision number. This resulted in a whole bunch of problems. Without any clear exceptions, it seemed the aggregate attempted to build up its aggregate from the still available snapshots, seemingly swallowing probable serialization issues and reported that the aggregate identifier is empty. Sending several commands in a short while resulted into the axon server going out of memory as well.

I get that I made a mistake, but it seems to me the framework should handle this situation better, and/or provide more tools to prevent this kind of situation from occurring at all. I was thinking of several things.

  1. I should easily be able to test the (de)serialisation of the aggregate, in order to catch forgetting to update the aggregate revision before it is deployed. I have now added a unit test that uses ReflectionUtils to get and set values of the aggregate, which I test against a hard-coded xml, but with ReflectionUtils it does feels a bit clunky.
  2. It seems to me that if the aggregate deserialisation gives any trouble, the framework should give a clear warning or exception message to point this out.
  3. I think that if the snapshot cannot be used for building up the aggregate, it should be disregarded with a warning and the aggregate should be rebuilt from scratch automatically.

So am I missing something? What do you guys think?

So, I’m not entirely sure this is a feature request, bug, or just a question. Let’s see!

Honestly, it feels like a mix to me too!
Let’s try to come to a satisfactory conclusion for your arguments.
Pretty sure we can manage that. So, let’s go at it.

First off, bummer you had your Aggregate’s classpath adjusted between application runs. In what kind of environment did this happen, actually? I’d hope just development or test, as that decreases the priority of this issue somewhat from your end, of course.

Now, let me give a short step-by-step explanation of what happens in the framework when an Aggregate is event sourced. That will give us common grounds for discussing the predicament further:

  1. A Command comes in through the CommandBus, targeted towards an aggregate.
  2. The Command Handler for said commands can only be invoked with a live version of the Aggregate, so a load operation is performed through the EventSourcingRepository.
  3. The EventSourcingRepository delegates the load request to the EventStore#readEvents(String aggregateIdentifier). In all scenario, we end up in the AbstractEventStore class.
  4. The AbstractEventStore#readEvents(String aggregateIdentifier) first checks for snapshots. It does so, based on the aggregateIdentifier only.
  5. If any are present, the last one is taken. On either note, an Optional is used to move on to the following step. If retrieving the snapshot fails, the AbstractEventStore#handleSnapshotReadingError method is invoked. By default, the exception is swallowed, and an empty optional is returned. For the AxonServertEventStore (thus your scenario), an EventStoreException is thrown if the gRPC connection failed for an unknown reason.
  6. The position of the snapshot is used to deduce how many events should still be read from the event store.
  7. The outcome of the snapshot query and event query are combined into a single event stream. Note that the snapshot is just a specific type of event in this case. On top of that, no deserialization has taken place yet at this stage. This combined stream is returned to the EventSourcingRepository.
  8. The EventSourcingRepository will move through the stream event per event. It is this time that deserialization occurs, as only now is the event really needed. If any upcasters are registered, they’ll be invoked as part of this process. This is the actually sourcing part where all handlers are invoked to construct the current aggregate state.
  9. With the current aggregate state in hand, the command handler is invoked.

I hope the above gives a little insight into the process.
From here, I think it is time to move through your points:

  1. Deserialization Testing Support - Interesting thought, which can have an amazing amount of branches if you ask me. Would you be able to give a more concrete sample of what this would look like? Any thoughts on how to support a generic solution to select any Serializer the framework provides would be a plus. My gut tells me something like this is a stretch, but I hope we can find something that’ll better support all our users.
  2. Clearer Warnings - I completely agree on this front and honestly expected it to do so. Would you be able to share what exceptions and warn logs you did get? Maybe from that, I can deduce where the framework swallowed the issue.
  3. Snapshot Disregarding - This is actually what steps 4 and 5 of my description already do. They, however, do not take the deserialization process into account. Mainly because the package of an Aggregate is oftentimes not changed. Nonetheless, if there’s anything to be done here from within the framework, that would be great, of course.

Concluding, I’d love to test out this issue locally, actually. That would allow me to deduce far better what we can do or what you might have missed. So if there is any chance that you can construct a small sample project focused on the problem, that would be amazing @maartenjanvangool. That would greatly speed up our investigation process. If you’re unable to, I’d also like to hear that, of course. That means we’d have to spend some additional cycles on this.

Thanks for your detailed answer. This actually happened in production. The solution was quite simple though, I had to update the snapshot revision number and it was all fixed, since it would rebuild the complete aggregate.

I did some more digging and I think I found the warning you (and I) would expect. I did not notice the first one last week (there were A LOT of warnings and exceptions).

2021-07-27 09:28:28.042  WARN 1 --- [mandProcessor-9] o.a.e.eventstore.AbstractEventStore      : Error reading snapshot for aggregate [xxxx]. Reconstructing from entire event stream.

io.axoniq.axonserver.connector.impl.StreamClosedException: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdown invoked
	at io.axoniq.axonserver.connector.event.impl.BufferedAggregateEventStream.hasNext( ~[axonserver-connector-java-4.5.jar!/:4.5]
	at io.axoniq.axonserver.connector.event.AggregateEventStream$1.tryAdvance( ~[axonserver-connector-java-4.5.jar!/:4.5]
	at java.base/java.util.Spliterator.forEachRemaining(Unknown Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/$ForEachOp.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/$ForEachOp$OfRef.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at org.axonframework.axonserver.connector.event.axon.AxonServerEventStore$AxonIQEventStorageEngine$1.tryAdvance( ~[axon-server-connector-4.5.1.jar!/:4.5.1]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/$FindOp.evaluateSequential(Unknown Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.readSnapshot( ~[axon-eventsourcing-4.5.1.jar!/:4.5.1]
	at org.axonframework.eventsourcing.eventstore.AbstractEventStore.readEvents( ~[axon-eventsourcing-4.5.1.jar!/:4.5.1]
	at org.axonframework.eventsourcing.EventSourcingRepository.readEvents( ~[axon-eventsourcing-4.5.1.jar!/:4.5.1]
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock( ~[axon-eventsourcing-4.5.1.jar!/:4.5.1]
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock( ~[axon-eventsourcing-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.LockingRepository.doLoad( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.LockingRepository.doLoad( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.AbstractRepository.lambda$load$4( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at java.base/java.util.HashMap.computeIfAbsent(Unknown Source) ~[na:na]
	at org.axonframework.modelling.command.AbstractRepository.load( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCommandHandler.handle( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler$AggregateCommandHandler.handle( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler.handle( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.modelling.command.AggregateAnnotationCommandHandler.handle( ~[axon-modelling-4.5.1.jar!/:4.5.1]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.messaging.unitofwork.DefaultUnitOfWork.executeWithResult( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.commandhandling.SimpleCommandBus.handle( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.commandhandling.SimpleCommandBus.doDispatch( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.commandhandling.SimpleCommandBus.dispatch( ~[axon-messaging-4.5.1.jar!/:4.5.1]
	at org.axonframework.axonserver.connector.command.AxonServerCommandBus$ ~[axon-server-connector-4.5.1.jar!/:4.5.1]
	at java.base/java.util.concurrent.Executors$ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$ Source) ~[na:na]
	at java.base/ Source) ~[na:na]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdown invoked
	at io.grpc.Status.asRuntimeException( ~[grpc-api-1.36.1.jar!/:1.36.1]
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose( ~[grpc-stub-1.36.1.jar!/:1.36.1]
	at io.grpc.internal.ClientCallImpl.closeObserver( ~[grpc-core-1.36.1.jar!/:1.36.1]
	at io.grpc.internal.ClientCallImpl.access$300( ~[grpc-core-1.36.1.jar!/:1.36.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal( ~[grpc-core-1.36.1.jar!/:1.36.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext( ~[grpc-core-1.36.1.jar!/:1.36.1]
	at ~[grpc-core-1.36.1.jar!/:1.36.1]
	at ~[grpc-core-1.36.1.jar!/:1.36.1]
	... 3 common frames omitted

On the client side the connection with the server was lost with the Axon server throwing exceptions like:

2021-07-27 09:28:09.713 WARN 1 --- [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler : Stream Error

io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
	at io.netty.handler.codec.http2.Http2Exception.streamError( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode( ~[netty-codec-http2-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection( ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode( ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead( ~[netty-codec-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at$HeadContext.channelRead( ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-transport-4.1.52.Final.jar:4.1.52.Final]
	at$EpollStreamUnsafe.epollInReady( ~[netty-transport-native-epoll-4.1.52.Final-linux-x86_64.jar:4.1.52.Final]
	at ~[netty-transport-native-epoll-4.1.52.Final-linux-x86_64.jar:4.1.52.Final]
	at ~[netty-transport-native-epoll-4.1.52.Final-linux-x86_64.jar:4.1.52.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$ ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
	at io.netty.util.internal.ThreadExecutorMap$ ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
	at ~[netty-common-4.1.52.Final.jar:4.1.52.Final]
	at java.base/ ~[na:na]

And then

Exception in thread "data-fetcher-14" java.lang.OutOfMemoryError: Java heap space

And after a while

2021-07-27 09:47:31.692  WARN 1 --- [st-dispatcher-3] i.a.a.g.GrpcCommandDispatcherListener    : Could not send command to 1@xxxxxxxxxxxxx-587f47c698-k782h.28085dfd-a3e3-4c4b-8a61-434075590ed8.default

io.axoniq.axonserver.exception.MessagingPlatformException: [AXONIQ-0001] call is closed
	at io.axoniq.axonserver.grpc.SendingStreamObserver.onNext( ~[classes/:na]
	at io.axoniq.axonserver.grpc.GrpcCommandDispatcherListener.send( ~[classes/:na]
	at io.axoniq.axonserver.grpc.GrpcCommandDispatcherListener.send( ~[classes/:na]
	at io.axoniq.axonserver.grpc.GrpcFlowControlledDispatcherListener.process( ~[classes/:na]
	at java.base/java.util.concurrent.Executors$ ~[na:na]
	at java.base/ ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$ ~[na:na]
	at java.base/ ~[na:na]
Caused by: java.lang.IllegalStateException: call is closed
	at ~[guava-30.0-android.jar:na]
	at io.grpc.internal.ServerCallImpl.sendMessageInternal( ~[grpc-core-1.36.0.jar:1.36.0]
	at io.grpc.internal.ServerCallImpl.sendMessage( ~[grpc-core-1.36.0.jar:1.36.0]
	at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext( ~[grpc-stub-1.36.0.jar:1.36.0]
	at io.axoniq.axonserver.grpc.SendingStreamObserver.onNext( ~[classes/:na]
	... 8 common frames omitted

I am guessing that rebuilding the aggregate while processing a lot of commands was too much of a burden? Although that does not really explain why updating the snapshot revision number fixed the entire problem, since the aggregagate had to be rebuilt then as well.

Can you make something of these exceptions? I can try to reproduce the issue in a demo project, but that seems like quite some work without any guarantees on actually reproducing the issue. So let me know.

And with regard to testing (de)serialization, I guess I should create a separate topic on that subject.

If you’d like, you can construct an issue for it in the issue tracker. I thought about this a little, and I think we can introduce some helpful solutions on the subject.

Yes, I can, although I think it has to do with machine resources (massive hunch). It’s not overly clear why the connection between your Axon Framework application and the Axon Server instance is closed. More specifically, why it became UNAVAILABLE.
The only guess I’d dare to make is the memory issue you’ve also shared.

Anyhow, I do not see how these traces lead to the deserialization problem we started with. It might be a stretch, but could this scenario somehow relate to this issue you’ve opened? Honestly, snapshots of that size are not regular by my standards.

Although it happened in production, I am glad you took this route. Changing the revision number on the aggregate triggered Axon’s RevisionSnapshotFilter to disregard snapshots that didn’t match the current revision. As such, you basically filtered out the “unserializable-snapshot.”

Concluding (for now), my sample project request was for the serialization issue.
Not for the connection issue, you might’ve experienced because of machine resources."
So, whether you’d still want to traverse that route is up to you. From my end, I’ll ask one of the Axon Server engineers if this situation rings a bell.

I will. We are already doing some (de)serialization tests with events and upcasters, I’m happy to share our approach. And tell you what I think is missing for doing the same with aggregates.

I do not think so. We figured the size of our aggregates was too large, and before the changes causing this issue, we changed the aggregate in order to make it (way) smaller. And that time, I did not forget to update the revision number :-).

I’ll see what I can do. I do think that adding some more test capabilities is the better approach in any case. Preventing this issue is better than solving it afterwards. So let’s first focus on testing!

On a sidenote: I’m currently in the middle of your course on event sourcing with axon after doing the first one from Allard Buijze. They’re good! And even though I was already familiar with most of the concepts and implementations, I still learned something. And I’m pushing my team members to do these courses as well. So thanks!

1 Like

I’ve created an enhancement request here. Testing command, event and aggregate (de)serialization · Issue #1904 · AxonFramework/AxonFramework · GitHub

Thanks! I just spotted the issue you’ve constructed.
I also added a piece of feedback there to share the prioritization of the idea.

Agreed, preventing is better. I find it hard to deduce whether the issue would be solved with additional de-/serialization tests, as the shared stack traces seem to point to something else.

Thanks for the compliments here @maartenjanvangool! A lot of effort has gone into them, and there are still more to come. We do think it is a very efficient way to share the intent of building applications with DDD, CQRS, and Event Sourcing using Axon. I hope you can get your colleagues to try it out as well!