The SAGA Story Part 2: Axon 4.2. Kafka Broker. Postgres EventStore. InMemorySaga. Inconsistent behavior

I’ve observed highly inconsistent behavior and I need advice on how to move forward. In order to ask my question I have to explain some of the domain so that I can share the observations in a meaningful way.

We are synchronizing data in a utility service where we are using Axon for consistency with the rest of our microservices.
The main entity is a SyncPoint. It contains the state of a synchronization and is scheduled to periodically poll.

Originally each SyncPoint was independent of another’s state and life was good. Later, we realized that in some situations we had to combine the state of multiple SyncPoints in order reconcile events post processing which we will not discuss here. Due to the time constraints and the existing framework rewriting and rethinking the domain was not really an option. So I took the existing events and appended them with an optional parameter for SyncGroupId. If this parameter was set then this SyncPoint was part of a group and should be processed in tandem. If it was not then proceed as usual.

This is what the flow looks like now:

Screen Shot 2019-10-24 at 7.50.50 PM.png

I’ve decided to share the entire Saga below:

`

@Saga
@ProcessingGroup("genericprocessor")
class SyncGroupProcessing constructor() {

    private val log = LoggerFactory.getLogger(this.javaClass)

    var syncGroupId: SyncGroupId = ""
    var topicKey: TopicKey? = null
    var modelName: String = ""

    var processing: MutableSet<SyncPointId> = mutableSetOf()
    var syncEntitiesInitial: MutableList<SyncEntity> = mutableListOf()
    var syncsProcessed: MutableMap<SyncPointId, SyncProcessed> = mutableMapOf()

    @StartSaga
    @SagaEventHandler(associationProperty = "syncGroupId")
    fun on(event: SyncPointTriggered,
           @Autowired syncPointService: SyncPointService,
           @Autowired syncModelService: SyncModelService,
           @Autowired syncGroupService: SyncGroupService) {
        if (event.syncGroupId == null) {
            log.debug("Entered a saga with a null syncGroupId")
            return
        }

        try {
            syncGroupId = event.syncGroupId!!

            val syncPoint = syncPointService.findById(event.syncPointId)
            log.info("SAGA: SyncPointTriggered [SG|$syncGroupId] [SP|${event.syncPointId}]")
            if (processing.isEmpty()) {
                val syncGroup = syncGroupService.findById(syncGroupId)
                if (syncGroup == null) {
                    log.debug("Sync Group [$syncGroupId]")
                    return
                }

                processing.addAll(syncGroup.syncPoints)

                // Initialize Topic Key and ModelName based off initial syncPoint in the group
                topicKey = syncPoint.topicKey
                modelName = syncModelService.findById(syncPoint.syncModelId).name
            }

            // Note this is a hack due to syncentities not being loaded through hibernate properly
            val mapper = jacksonObjectMapper()
            val syncEntities = mapper.readValue<Map<String, SyncEntity>>(mapper.writeValueAsString(syncPoint.syncEntities.values))
            // End Hack
            
            syncEntitiesInitial.addAll(syncEntities.values)
        } catch (exception: Exception) {
            log.error("SAGA: SyncPointTriggered Exception: ${exception.message}\n${exception.stackTrace}")
        }
    }

    @SagaEventHandler(associationProperty = "syncGroupId")
    fun on(event: SyncProcessed,
           @Autowired @Qualifier("eventBus") eventBus: EventBus) {
        if (event.syncGroupId == null) {
            log.debug("Entered a saga with a null syncGroupId")
            return
        }

        log.info("SAGA: SyncProcessed [SG|$syncGroupId] [SP|${event.syncPointId}]")
        processing.remove(event.syncPointId)
        syncsProcessed[event.syncPointId] = event

        if(processing.isEmpty()) {
            eventBus.publish(GenericEventMessage.asEventMessage<SyncHostCreated>(
                    SyncGroupProcessed(syncGroupId)))
        }
    }

    @SagaEventHandler(associationProperty = "syncGroupId")
    fun on(event: SyncProcessFailed,
           @Autowired @Qualifier("eventBus") eventBus: EventBus,
           @Autowired syncPointService: SyncPointService) {
        if (event.syncGroupId == null) {
            log.debug("Entered a saga with a null syncGroupId")
            return
        }

        log.info("SAGA: SyncProcessFailed [SG|$syncGroupId] [SP|${event.syncPointId}]")
        
        processing.remove(event.syncPointId)

        // By removing previously added syncpoints from initial at this stage
        // they will not be part of the difference processing and therefore will not be removed.
        syncEntitiesInitial.removeAll(syncPointService.findById(event.syncPointId).syncEntities.values)

        if(processing.isEmpty()) {
            eventBus.publish(GenericEventMessage.asEventMessage<SyncHostCreated>(
                    SyncGroupProcessed(syncGroupId)))
        }
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "syncGroupId")
    fun on(event: SyncGroupProcessed,
           @Autowired syncPointService: SyncPointService,
           @Autowired @Qualifier("eventBus") eventBus: EventBus) {
        if (event.syncGroupId == null) {
            log.debug("Entered a saga with a null syncGroupId")
            return
        }

        log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId]")
        try {
            val newSyncEntities = syncsProcessed.flatMap { it.value.syncEntities }.map { it.id to it }.toMap()
            log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId] | newSyncEntities = ${newSyncEntities.size}")
            val oldSyncEntities = syncEntitiesInitial.map { it.id to it }.toMap()
            log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId] | oldSyncEntities = ${newSyncEntities.size}")

            syncPointService.processDifferences(
                    oldSyncEntities = oldSyncEntities,
                    newSyncEntities = newSyncEntities,
                    topicKey = topicKey,
                    syncModelName = modelName,
                    syncPointId = "GROUPID:${syncGroupId}"
            )

            syncsProcessed.forEach {
                val syncPointId = it.key
                val syncEntitiesMap = it.value.syncEntities.map { it.id to it }.toMap()
                publishEvent(SyncEnded(syncPointId, it.value.responseHash, syncEntitiesMap), topicKey, eventBus)
                        .also { log.info("SAGA: [SP: ${syncPointId}] Sync Ended.") }
            }
        } catch (exception: Exception) {
            log.error("SAGA: SyncGroupProcessed Exception: ${exception.message}\n${exception.stackTrace}")
        }
    }

    private fun <T> publishEvent(event: T, topicKey: String?, eventBus: EventBus) {
        if (topicKey != null) {
            eventBus.publish(GenericEventMessage.asEventMessage<T>(event).withMetaData(
                    mutableMapOf<String, String>(TOPIC_KEY to topicKey)
            ))
        } else {
            eventBus.publish(GenericEventMessage.asEventMessage<T>(event))
        }
    }
}

`

I know the logic may be a little convoluted and I do not ask for it’s understanding. The primary purpose of sharing the above code is to emphasize the log statements.

I can now finally share the observations and attempt to describe the inconsistent behavior.

Screen Shot 2019-10-24 at 7.33.10 PM.png

We can see in the image above 2 sync groups. One that is working perfectly and another 121496 that got stuck in a SYNCING state.
When we dissect the SYNCPOINTS within the group that failed they look like this:

/// SYNCED SUCCESSFULLY

5554e1ff-d286-414f-a81d-49c8b857b4ac

f9b05b44-f136-422c-8755-295729d28599

b5b22f10-9e48-4b57-930b-408f31bf9c8d

// DID NOT SYNC SUCCESSFULLY

bd6ebe6e-dd6a-4706-ab3c-01e99c3d3612

7a2a6e9e-838a-492f-a444-75022d8ee87e

91a984e8-d18b-4493-bed4-cd9a0d80786e

I then attempted to dissect the log and map out events. Maybe this was a timing issue. That does not appear to be the case.

Screen Shot 2019-10-24 at 8.06.24 PM.png

The bd6ebe6e SyncPoint gets stuck. Meanwhile 554e1ff does not and goes on to a second sync.

For some unknown reason the SYNCPROCESSED event for 554e1ff never reaches the Saga event handler.

Thank you for taking the time to read and I hope I communicated my problem well. Is there salvation or am I forced to rework the entire system? I’d like to at least understand why it’s not working.

The one question you may have did the SyncProcessed event really have syncGroupId set?

`
axon-metadata-traceId=d415a4cc-e26d-403f-befb-9db4983a2609,axon-metadata-correlationId=acee1a93-8060-4f28-9b89-21fb800fa6f4,axon-message-id=5b9f26bc-4f20-4d9b-8928-2c9e41187307,axon-message-revision=2.0,axon-message-timestamp=,axon-message-type=com.aramark.common.sps.SyncProcessed {“sync_point_id”:“bd6ebe6e-dd6a-4706-ab3c-01e99c3d3612”,“sync_entities”:[],“response_hash”:“31471aeaedc625d9ed7786886809e2cbe614719ff75fc55eca185473c6b2eee3”,“sync_group_id”:“121496”}

`

Hi Michael,

you mentioned you use in-memory Sagas. Is that the In-Memory Saga Store or are you using the cached version? Recently, we found an issue in the caching saga store, where some updates weren’t properly atomic.
Which messagesource do you use for your Sagas? Do you read them straight from the event store, or from the KafkaMessageSource?

Cheers,

twitter-icon_128x128.png

I see. The reason for the InMemorySagaStore is due to the nullable associated value. I shared the question on that subject in another thread. If this is the root cause this is highly informative and I can begin to think about how to work around it. Would you suggest this is the best path forward and if so any advice beyond what Steven has already shared for dealing with nullable association property?

Which messagesource do you use for your Sagas? Do you read them straight from the event store, or from the KafkaMessageSource?

I often get stumbled by these rather straight forward seeming questions. The saga is decorated with a ProcessingGroup that points to Tracking Kafka. Otherwise no additional configurations exist so it would be whatever the default -starter configuration is. I think may also be relevant to mention that for this service the event store is also in Memory since we do not want to build a table of events in this utility service.

I’ve rewritten the logic. There are no more nullable association properties. I’ve removed the InMemorySaga store but we are still losing events. ~50% of them are not associated.

Hi Michael,

From your Saga code, I’d try to adjust your associations a little further.

I would not use the syncGroupId in the SyncPointTriggered StartSaga handler as the association property right away.

Rather, I’d use the I guess syncId instead.
From there on, you can check whether you have a syncGroupId.

If not, I would immediately end the lifecycle of the Saga, as you are likely sure at this point it will act on it’s own and the “tandem synchronization” is not needed.

You can conditionally end the lifecycle of a saga with the SagaLifecycle.end() method.

If there is a syncGroupId, that’s when I’d purposely add an association for this field.
You can add this association with the SagaLifecycle#associateWith(String, String) method, which I’d use as follows:

SagaLifecycle.associateWith(“syncGroupId”, event.syncGroupId)

Taken the above route seems more fitting to me.
I do not think this is an answer to the unassociated events you are talking about though.
It is not something I have encountered before when using the Event Store as the message source, but also not something I’ve heard others using Kafka as message source for sagas having issues with.

Regardless, I hope this helps somewhat to set your Saga in stone.

Cheers,
Steven

Thank you for the response.
I’m continuing to research the problem. I have the following understanding so far

TrackingEventProcessor::processBatch() // invokes the next function below
TrackingEventProcessor::processUnitOfWork() // invokesthe eventHandlerInvoker
From here the event may arrive in multiple locations but the focus is on:
MultiEventHandlerInvoker::handle()
I’ve not made a clean connection to the next two methods yet but they are AnnotatedSaga and AbstractSagaManager.
AbstractSagaManager::doInvokeSaga()
AnnotatedSaga::handle and canHandle is where things usually fail

I have two questions/asks. Can you help fill in the gaps to how an event goes from a stream (in our case Kafka) to eventually arriving inside the SAGA.
The second question is, are events within a SAGA processed asynchronously or is there some form of a lock on the Saga to ensure that only 1 event is ever updating it’s state at a time?

I’ve tracked down some aspects of the issue we’re having:

private boolean sequencingPolicyMatchesSegment(EventMessage<?> message, Segment segment) {
    return segment.matches(Objects.hashCode(getOrDefault(
            sequencingPolicy.getSequenceIdentifierFor(message),
            message::getIdentifier)
    ));
}

We are currently on 4.2. I’m going to attempt to downgrade to an earlier version. Suggestions welcome

Did not downgrade. Still running 4.2 but resolved the above problem by using a custom SequencePolicy where getSequenceIdentifierFor() returns an autoincrementing static value.

SAGAs are still broken due to the following line of code:

AbstractSagaManager::shouldCreateSaga() Line[104] creationPolicy[IF_NONE_FOUND] Saga Invoked [false] InitializationPolicy:Key[syncGroupId] InitializationPolicy:Value[121496] Segment:id[6] Segment:matches[false]

`

private boolean shouldCreateSaga(Segment segment, boolean sagaInvoked,
                                 SagaInitializationPolicy initializationPolicy) {
    return ((initializationPolicy.getCreationPolicy() == SagaCreationPolicy.ALWAYS
            >> (!sagaInvoked && initializationPolicy.getCreationPolicy() == SagaCreationPolicy.IF_NONE_FOUND)))
            && segment.matches(initializationPolicy.getInitialAssociationValue());
}

`
segment.matches(…) returns false