I’ve observed highly inconsistent behavior and I need advice on how to move forward. In order to ask my question I have to explain some of the domain so that I can share the observations in a meaningful way.
We are synchronizing data in a utility service where we are using Axon for consistency with the rest of our microservices.
The main entity is a SyncPoint. It contains the state of a synchronization and is scheduled to periodically poll.
Originally each SyncPoint was independent of another’s state and life was good. Later, we realized that in some situations we had to combine the state of multiple SyncPoints in order reconcile events post processing which we will not discuss here. Due to the time constraints and the existing framework rewriting and rethinking the domain was not really an option. So I took the existing events and appended them with an optional parameter for SyncGroupId. If this parameter was set then this SyncPoint was part of a group and should be processed in tandem. If it was not then proceed as usual.
This is what the flow looks like now:
I’ve decided to share the entire Saga below:
`
@Saga
@ProcessingGroup("genericprocessor")
class SyncGroupProcessing constructor() {
private val log = LoggerFactory.getLogger(this.javaClass)
var syncGroupId: SyncGroupId = ""
var topicKey: TopicKey? = null
var modelName: String = ""
var processing: MutableSet<SyncPointId> = mutableSetOf()
var syncEntitiesInitial: MutableList<SyncEntity> = mutableListOf()
var syncsProcessed: MutableMap<SyncPointId, SyncProcessed> = mutableMapOf()
@StartSaga
@SagaEventHandler(associationProperty = "syncGroupId")
fun on(event: SyncPointTriggered,
@Autowired syncPointService: SyncPointService,
@Autowired syncModelService: SyncModelService,
@Autowired syncGroupService: SyncGroupService) {
if (event.syncGroupId == null) {
log.debug("Entered a saga with a null syncGroupId")
return
}
try {
syncGroupId = event.syncGroupId!!
val syncPoint = syncPointService.findById(event.syncPointId)
log.info("SAGA: SyncPointTriggered [SG|$syncGroupId] [SP|${event.syncPointId}]")
if (processing.isEmpty()) {
val syncGroup = syncGroupService.findById(syncGroupId)
if (syncGroup == null) {
log.debug("Sync Group [$syncGroupId]")
return
}
processing.addAll(syncGroup.syncPoints)
// Initialize Topic Key and ModelName based off initial syncPoint in the group
topicKey = syncPoint.topicKey
modelName = syncModelService.findById(syncPoint.syncModelId).name
}
// Note this is a hack due to syncentities not being loaded through hibernate properly
val mapper = jacksonObjectMapper()
val syncEntities = mapper.readValue<Map<String, SyncEntity>>(mapper.writeValueAsString(syncPoint.syncEntities.values))
// End Hack
syncEntitiesInitial.addAll(syncEntities.values)
} catch (exception: Exception) {
log.error("SAGA: SyncPointTriggered Exception: ${exception.message}\n${exception.stackTrace}")
}
}
@SagaEventHandler(associationProperty = "syncGroupId")
fun on(event: SyncProcessed,
@Autowired @Qualifier("eventBus") eventBus: EventBus) {
if (event.syncGroupId == null) {
log.debug("Entered a saga with a null syncGroupId")
return
}
log.info("SAGA: SyncProcessed [SG|$syncGroupId] [SP|${event.syncPointId}]")
processing.remove(event.syncPointId)
syncsProcessed[event.syncPointId] = event
if(processing.isEmpty()) {
eventBus.publish(GenericEventMessage.asEventMessage<SyncHostCreated>(
SyncGroupProcessed(syncGroupId)))
}
}
@SagaEventHandler(associationProperty = "syncGroupId")
fun on(event: SyncProcessFailed,
@Autowired @Qualifier("eventBus") eventBus: EventBus,
@Autowired syncPointService: SyncPointService) {
if (event.syncGroupId == null) {
log.debug("Entered a saga with a null syncGroupId")
return
}
log.info("SAGA: SyncProcessFailed [SG|$syncGroupId] [SP|${event.syncPointId}]")
processing.remove(event.syncPointId)
// By removing previously added syncpoints from initial at this stage
// they will not be part of the difference processing and therefore will not be removed.
syncEntitiesInitial.removeAll(syncPointService.findById(event.syncPointId).syncEntities.values)
if(processing.isEmpty()) {
eventBus.publish(GenericEventMessage.asEventMessage<SyncHostCreated>(
SyncGroupProcessed(syncGroupId)))
}
}
@EndSaga
@SagaEventHandler(associationProperty = "syncGroupId")
fun on(event: SyncGroupProcessed,
@Autowired syncPointService: SyncPointService,
@Autowired @Qualifier("eventBus") eventBus: EventBus) {
if (event.syncGroupId == null) {
log.debug("Entered a saga with a null syncGroupId")
return
}
log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId]")
try {
val newSyncEntities = syncsProcessed.flatMap { it.value.syncEntities }.map { it.id to it }.toMap()
log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId] | newSyncEntities = ${newSyncEntities.size}")
val oldSyncEntities = syncEntitiesInitial.map { it.id to it }.toMap()
log.info("SAGA: SyncGroupProcessed [SG|$syncGroupId] | oldSyncEntities = ${newSyncEntities.size}")
syncPointService.processDifferences(
oldSyncEntities = oldSyncEntities,
newSyncEntities = newSyncEntities,
topicKey = topicKey,
syncModelName = modelName,
syncPointId = "GROUPID:${syncGroupId}"
)
syncsProcessed.forEach {
val syncPointId = it.key
val syncEntitiesMap = it.value.syncEntities.map { it.id to it }.toMap()
publishEvent(SyncEnded(syncPointId, it.value.responseHash, syncEntitiesMap), topicKey, eventBus)
.also { log.info("SAGA: [SP: ${syncPointId}] Sync Ended.") }
}
} catch (exception: Exception) {
log.error("SAGA: SyncGroupProcessed Exception: ${exception.message}\n${exception.stackTrace}")
}
}
private fun <T> publishEvent(event: T, topicKey: String?, eventBus: EventBus) {
if (topicKey != null) {
eventBus.publish(GenericEventMessage.asEventMessage<T>(event).withMetaData(
mutableMapOf<String, String>(TOPIC_KEY to topicKey)
))
} else {
eventBus.publish(GenericEventMessage.asEventMessage<T>(event))
}
}
}
`
I know the logic may be a little convoluted and I do not ask for it’s understanding. The primary purpose of sharing the above code is to emphasize the log statements.
I can now finally share the observations and attempt to describe the inconsistent behavior.
We can see in the image above 2 sync groups. One that is working perfectly and another 121496 that got stuck in a SYNCING state.
When we dissect the SYNCPOINTS within the group that failed they look like this:
/// SYNCED SUCCESSFULLY
5554e1ff-d286-414f-a81d-49c8b857b4ac
f9b05b44-f136-422c-8755-295729d28599
b5b22f10-9e48-4b57-930b-408f31bf9c8d
// DID NOT SYNC SUCCESSFULLY
bd6ebe6e-dd6a-4706-ab3c-01e99c3d3612
7a2a6e9e-838a-492f-a444-75022d8ee87e
91a984e8-d18b-4493-bed4-cd9a0d80786e
I then attempted to dissect the log and map out events. Maybe this was a timing issue. That does not appear to be the case.
The bd6ebe6e SyncPoint gets stuck. Meanwhile 554e1ff does not and goes on to a second sync.
For some unknown reason the SYNCPROCESSED event for 554e1ff never reaches the Saga event handler.
Thank you for taking the time to read and I hope I communicated my problem well. Is there salvation or am I forced to rework the entire system? I’d like to at least understand why it’s not working.