Segment merging failures for Saga Processor

Hello
I was working on a saga event processor that was behind by around 130 Million events. Going through the Optimizing Event Processor performance blog. The first thing I did was to enable the batch size on the streaming processor configuration. The next thing I did was to split the segments for parallel processing.

I currently have a few segments that are close to the actual maximum global_index in the domain_event_entry table. I was trying to merge the segments that have the same owner. attached the picture for reference

I was going through the event processor documentation for split and merge functionality and it says the following

  • Split: for fair balancing, a split is ideally performed on the largest segment
  • Merge: for fair balancing, a merge is ideally performed on the smallest segment

I wanted to understand the definition of largest, in this context. Is it just the segment column absolute value? or it the index value in token column that’s behind compared the global_index in the domain_event_entry table?

However, I proceeded to merge the segments for the saga event processor that I was working on.
The logic to merge that I’m currently using is as follows:

    // 1. Look up the event-processor
    optionalProcessor = eventProcessingConfiguration.eventProcessor(name)

    // 2. Return “false” if the processor is unknown
    if optionalProcessor is empty
        return (false)

    // 3. Access the processor and the status of the requested segment
    processor = optionalProcessor.get()
    status    = processor.processingStatus()[segmentId]

    // 4. Abort if this segment is already being merged
   if status.isMerging()
      throw IllegalStateException("Segment already merging")
   
    // 5. Fetch the segment instance
   segment = status.getSegment()

    // 6. Ensure the parent segment is released if not owned by this processor
   parentId          = segment.mergeableSegmentId()
   parentStatus      = processor.processingStatus()[parentId]
   if parentStatus == null
      processor.releaseSegment(parentId)

   // 7. Start the merge and return its future
   return processor.mergeSegment(segmentId)
    

With the above logic of merging segments I always keep hitting the error for all the segments that are shown in the above picture

Request processing failed: org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException: Unable to claim token ‘xXxXxXSagaProcessor[0]’. It is owned by ‘1@-64hrr’

When I check the token_entry entry table, indeed the segment 0 is owned by an other instance. The step 6 in the logic above would be throwing the error. Now I’m wondering are these segments never going to be mergeable, given there’s no way to force the segment claim by the instance that I need merging in.

Not sure on how to navigate my situation. Any advice would be helpful.
Thank you.

Hey @kk909, pretty sure I can help you here.

For Axon Framework to be able to merge segments, it simply NEEDS a claim on both segments. In a distributed enviroment, that means you need to tell all instances running the given Event Processor to release the claim of the segments you aim to merge.

Once that’s done, you can tell 1 of the Axon Framework instances to merge the segment, which should then succeed.

As this is quite a bit of house-keeping, note that AxonIQ Console does all that for you out of the box. Attaching your Axon Framework application there is a breeze, and you will immediately benefit from simplified split/merge/reset operations.

With all that said, I hope this helps you further, @kk909!