Change AggregateIdentifier using Event Transformation?

Hi.

I would like the change the aggregate identifier on one of my aggregates. (from uuid to username)

With

as my source of truth, I came up with this code:

fun transformAnvandareAggregateIdentifier(firstToken: Long = 0, lastToken: Long = -1L) {

        val context: MutableMap<String, AnvandareId> = mutableMapOf()

        val anvandareSkapadEvent = "...some-package..AnvandareSkapadEvent"
        val markeradSomFavoritEvent = "...some-package..MarkeradSomFavoritEvent"
        val avmarkeradSomFavoritEvent = "...some-package..AvmarkeradSomFavoritEvent"

        val oldRevision = "1.0"
        val newRevision = "2.0"

        AutoClosableAxonServerConnection
            .connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection ->

                // define the range of events
                val first = firstToken

                val last = if (lastToken == -1L) {
                    connection.eventChannel().lastToken.get()
                } else {
                    lastToken
                }
                try {
                    logger.info { "Start transforming" }
                    EventSources
                        .range({ connection.eventChannel() }, firstToken, last)
                        .filter { eventWithToken ->
                            logger.debug { "Event: ${eventWithToken.event.payload.type}" }
                            (eventWithToken.event.payload.type == anvandareSkapadEvent && eventWithToken.event.payload.revision == oldRevision)
                                    || (eventWithToken.event.payload.type == markeradSomFavoritEvent && eventWithToken.event.payload.revision == oldRevision)
                                    || (eventWithToken.event.payload.type == avmarkeradSomFavoritEvent && eventWithToken.event.payload.revision == oldRevision)
                        }
                        .transform("Transform") { eventWithToken, appender ->
                            val intermediateRepresentation = InitialEventRepresentation(
                                GrpcBackedDomainEventData(eventWithToken.event),
                                eventSerializer
                            )

                            val event = intermediateRepresentation.getData(JsonNode::class.java).data

                            val upcastedEventWithToken = when (eventWithToken.event.payload.type) {
                                anvandareSkapadEvent -> {
                                    logger.debug { "Store upcast context ${event.get("id")} -> ${event.get("anvandarnamn")}" }
                                    context[event.get("id").asText()] = AnvandareId(event.get("anvandarnamn").asText())

                                    val upcastedEvent = intermediateRepresentation.upcastPayload(
                                        SimpleSerializedType(
                                            anvandareSkapadEvent,
                                            newRevision
                                        ),
                                        JsonNode::class.java
                                    ) { node ->
                                        (node as ObjectNode).put("id", context[event.get("id").asText()].toString())
                                        node.remove("anvandarnamn")
                                        node
                                    }

                                    Event.newBuilder()
                                        .mergeFrom(eventWithToken.event)
                                        .setAggregateIdentifier(context[event.get("id").asText()].toString())
                                        .setPayload(
                                            SerializedObject.newBuilder()
                                                .mergeFrom(eventWithToken.event.payload)
                                                .setData(
                                                    ByteString.copyFrom(
                                                        eventSerializer.serialize(
                                                            upcastedEvent.data,
                                                            ByteArray::class.java
                                                        ).data
                                                    )
                                                )
                                                .build()
                                        )
                                        .build();
                                }

                                markeradSomFavoritEvent -> {
                                    val upcastedEvent = intermediateRepresentation.upcastPayload(
                                        SimpleSerializedType(
                                            markeradSomFavoritEvent,
                                            newRevision
                                        ),
                                        JsonNode::class.java
                                    ) { node ->
                                        (node as ObjectNode).put("id", context[event.get("id").asText()].toString())
                                        node
                                    }

                                    Event.newBuilder()
                                        .mergeFrom(eventWithToken.event)
                                        .setAggregateIdentifier(context[event.get("id").asText()].toString())
                                        .setPayload(
                                            SerializedObject.newBuilder()
                                                .mergeFrom(eventWithToken.event.payload)
                                                .setData(
                                                    ByteString.copyFrom(
                                                        eventSerializer.serialize(
                                                            upcastedEvent.data,
                                                            ByteArray::class.java
                                                        ).data
                                                    )
                                                )
                                                .build()
                                        )
                                        .build();
                                }

                                avmarkeradSomFavoritEvent -> {
                                    val upcastedEvent = intermediateRepresentation.upcastPayload(
                                        SimpleSerializedType(
                                            avmarkeradSomFavoritEvent,
                                            newRevision
                                        ),
                                        JsonNode::class.java
                                    ) { node ->
                                        (node as ObjectNode).put("id", context[event.get("id").asText()].toString())
                                        node
                                    }

                                    Event.newBuilder()
                                        .mergeFrom(eventWithToken.event)
                                        .setAggregateIdentifier(context[event.get("id").asText()].toString())
                                        .setPayload(
                                            SerializedObject.newBuilder()
                                                .mergeFrom(eventWithToken.event.payload)
                                                .setData(
                                                    ByteString.copyFrom(
                                                        eventSerializer.serialize(
                                                            upcastedEvent.data,
                                                            ByteArray::class.java
                                                        ).data
                                                    )
                                                )
                                                .build()
                                        )
                                        .build();
                                }

                                else -> {
                                    throw UnsupportedOperationException()
                                }
                            }

                            appender.replaceEvent(eventWithToken.token, upcastedEventWithToken)

                        }
                        .execute { connection.eventTransformationChannel() }
                        .get()
                } catch (e: Exception) {
                    logger.error(e) { "Error executing event transformation" }
                } finally {
                    connection.ensureNoActiveTransformations()
                }


            }
        logger.info { "Transformation complete." }
    }

Executing this yields an error though:

io.grpc.StatusRuntimeException: CANCELLED: Invalid aggregate identifier: maghei, expecting cb6a0585-6191-4404-b741-412f66e2273f

Is it not possible to change aggregate identifiers using event transformation, or am I missing something?

If not, how can I do this using some other mechanism?

Thanks!

/Magnus

I am sad to say that the aggregate identifier is off-limits to be adjusted through the Event Transformation API. Since the aggregate identifiers combined with the aggregate sequence number form the aggregate event streams, they’re paramount for driving command models. Such a change is thus very rough to make in a live system, as the Event Transformation API allows you to.

Furthermore, although your use case is a mapping based on existing data in the event, an API that would allow changes to the aggregate identifier should also be able to split or merge an aggregate event stream. The Event Transformation API is just not tailored towards that.

Generating flexibility concerning command model stream changes is one reason why we’re working on the Dynamic Consistency Boundary for Axon Server and Axon Framework, actually.

For now though, your best course of action is to make an Axon Framework application reading from your old store and writing an adjusted Aggregate event stream to another. Definitely doable to make something like this. It is just a bit more custom work on your end I am afraid.

Lastly, sorry for the inconvenience here, @daysleeper75; know we’re working on this at AxonIQ.