How to snaphot if a event-sourced aggregate load time is already too large (too many events)

Hi folks,

I ran into a funny situation with Axon Framework. I developed an aggregate that is frequently receiving updates and observed that for some instances the load time (re-hydrating the state from event stream) was too high.

So I introduced the snapshotter (standard AggregateSnapshotter) and a amount-based trigger definition. Apparently, it was already to late, since I have an aggregate instance with hundreds of large volume events and for this particular instance I’m running into LockAcquisitionFailedException.

I read the Aggregate throwing LockAcquisitionFailedException and introduced the lockFactory by configuring the lockAttemptTimeout to 120 (instead of 10).

The problem is still there, so I could of course push this value to some crazy large number to allow the aggregate load once and create the snapshot - and then the problem will disappear.

Is there any other smarter way of doing this instead of deploying this patch to production and then changing it back to usual value? I believe it is only about 2-3 aggregate instances - all other work as desired, create snapshots and have an adequate load time…

Thanks for advice,

Simon

You should be able to “manually” trigger the creation of snapshots. Simple call the snapshotter with the identifiers of the aggregates you would like to create a snapshot for should suffice.

I wonder, though: the LockAcquisitionException is an error that occurs while attempting to retrieve a lock. Loading of events will only happen after that. Normally, lock acquisition attempts only fail when another process has claimed the lock and is keeping it for much too long.

Hi Allard,

thank you for the response. You are right, the LockAcquisitionException is actually thrown by a next command being dispatched because the aggregate is locked. And it is locked because it has an extreme load time, since it is trying to load thousand of events to rehydrate the state from the event stream.

I followed your approach and created a simple endpoint which schedules the snapshot creation, but I had no luck yet:

2023-09-07 22:25:27.854  WARN 1 --- [nio-8090-exec-3] o.a.eventsourcing.AbstractSnapshotter    : An attempt to create and store a snapshot resulted in an exception. Exception summary: io.grpc.StatusRuntimeException: UNKNOWN

Is there any other smart solution, how I can maybe influence the number of the events this aggregate reads to re-hydrate before creating the snapshot?

From the business requirements I don’t rely on all of them, since the state of the aggregate re-hydrated from previous events is changed by the follow-up event (non-cumulative state). To drive it even further - it would be ok to create a snapshot on any particular position in the event stream (even taking maybe the tail create and the head event) and this would “rescue” my aggregate. I could even imagine to “drop” the entire stream for it and create a new one, but as far as I understand this is not possible at all, because once the aggregate id is used, there is no way to reuse it, right?

Any ideas?

Cheers,

Simon

Allard,

just another question - if I’m not using the default AggregateSnapshotter, what do I need to implement in order to produce this small piece of snapshot data? Any examples you could recommend?

I’m just thinking towards direction of creating an “empty” snapshot and then update the aggregate’s state by sending new commands (full ok by the business domain) which will modify the state correctly.

class SelectiveAggregateSnapshotter(
  builder: AggregateSnapshotter.Builder,
  private val tailEventSequenceNumber: Long?
) : AggregateSnapshotter(builder) {

  override fun createSnapshotterTask(
    aggregateType: Class<*>,
    aggregateIdentifier: String
  ): Runnable {
    return Runnable {
      val entireStream: DomainEventStream = this.eventStore.readEvents(aggregateIdentifier)
      val firstEvent = entireStream.peek()
      val eventSubStream: DomainEventStream = this.eventStore.readEvents(
        aggregateIdentifier,
        tailEventSequenceNumber ?: entireStream.lastSequenceNumber
      )
      val eventStream = DomainEventStream.concat(DomainEventStream.of(firstEvent), eventSubStream)
      val snapshotEvent: DomainEventMessage<*>? = createSnapshot(aggregateType, aggregateIdentifier, eventStream)
      // a snapshot should only be stored if the snapshot replaces at least more than one event
      if (snapshotEvent != null && snapshotEvent.sequenceNumber > firstEvent.sequenceNumber) {
        eventStore.storeSnapshot(snapshotEvent)
      }
    }
  }
}

Something like this… What do you think?

Cheers,

Simon

Hi Simon,

a snapshot event can be anything you want it to be. For Axon, it’s just another event, with he difference that if you use this event to reconstruct the state of an aggregate, any events in the aggregate’s stream that has a lower (or equal) sequence number can be ignored.

If you’re capable of constructing the aggregate’s state using a different way than reloading all events, then you can to that and call EventStore.appendSnapshot();

But I don’t understand the underlying problem. Reading a lot of events should not result in a gRPC exception. But these issues could be hard to resolve, sometimes. There could be a router or load balancer somewhere dropping long-running connections.

One thing to conder as a workaround, is to wrap the DomainEventStream that is used by the snapshotter to create snapshots. You could do that by creating a subclass and calling super.createSnapshot() with a slightly modified version of the DomainEventStream. The thing that you’d have to do differently is instead of throwing exceptions from the underlying stream, simply end the stream at that point. That will cause the creation of a snapshot somewhere mid-stream. The next attempt will then get further, and so on. Not ideal, but unless we’re able to diagnose the underlying issue, maybe the fastest way forward.

Thank you Allard,

since I had a whole weekend to think about, I indeed created a Snapshotter and Prototype factory in which I can pass the aggregateSequenceId of the event to start reading from. Current implementation takes the first event (usually a create) and all events starting from that point.

I was trying to find the value of the last event from the event store, but this didn’t work for me, so I added an additional method to my own snapshotter and can pass the value from outside.

Everything a little weird, but now I successfully created a snapshot and the problems are gone.

Thank you.

Glad to hear that.
Still wondering what the underlying reason for the failures was, though…