Hi.
I would like the change the aggregate identifier on one of my aggregates. (from uuid to username)
With
- Event Transformation
- Blog: Help! I want to change my event store
- axon-event-transformation-example/src/main/kotlin/infrastructure/EventCleanup.kt at 40acd438d51df2c5f423576d5fc2d01269758afa · holixon/axon-event-transformation-example · GitHub
as my source of truth, I came up with this code:
fun transformAnvandareAggregateIdentifier(firstToken: Long = 0, lastToken: Long = -1L) {
val context: MutableMap<String, AnvandareId> = mutableMapOf()
val anvandareSkapadEvent = "...some-package..AnvandareSkapadEvent"
val markeradSomFavoritEvent = "...some-package..MarkeradSomFavoritEvent"
val avmarkeradSomFavoritEvent = "...some-package..AvmarkeradSomFavoritEvent"
val oldRevision = "1.0"
val newRevision = "2.0"
AutoClosableAxonServerConnection
.connect(connect.hostName, connect.grpcPort, this.javaClass.simpleName, this.context).use { connection ->
// define the range of events
val first = firstToken
val last = if (lastToken == -1L) {
connection.eventChannel().lastToken.get()
} else {
lastToken
}
try {
logger.info { "Start transforming" }
EventSources
.range({ connection.eventChannel() }, firstToken, last)
.filter { eventWithToken ->
logger.debug { "Event: ${eventWithToken.event.payload.type}" }
(eventWithToken.event.payload.type == anvandareSkapadEvent && eventWithToken.event.payload.revision == oldRevision)
|| (eventWithToken.event.payload.type == markeradSomFavoritEvent && eventWithToken.event.payload.revision == oldRevision)
|| (eventWithToken.event.payload.type == avmarkeradSomFavoritEvent && eventWithToken.event.payload.revision == oldRevision)
}
.transform("Transform") { eventWithToken, appender ->
val intermediateRepresentation = InitialEventRepresentation(
GrpcBackedDomainEventData(eventWithToken.event),
eventSerializer
)
val event = intermediateRepresentation.getData(JsonNode::class.java).data
val upcastedEventWithToken = when (eventWithToken.event.payload.type) {
anvandareSkapadEvent -> {
logger.debug { "Store upcast context ${event.get("id")} -> ${event.get("anvandarnamn")}" }
context[event.get("id").asText()] = AnvandareId(event.get("anvandarnamn").asText())
val upcastedEvent = intermediateRepresentation.upcastPayload(
SimpleSerializedType(
anvandareSkapadEvent,
newRevision
),
JsonNode::class.java
) { node ->
(node as ObjectNode).put("id", context[event.get("id").asText()].toString())
node.remove("anvandarnamn")
node
}
Event.newBuilder()
.mergeFrom(eventWithToken.event)
.setAggregateIdentifier(context[event.get("id").asText()].toString())
.setPayload(
SerializedObject.newBuilder()
.mergeFrom(eventWithToken.event.payload)
.setData(
ByteString.copyFrom(
eventSerializer.serialize(
upcastedEvent.data,
ByteArray::class.java
).data
)
)
.build()
)
.build();
}
markeradSomFavoritEvent -> {
val upcastedEvent = intermediateRepresentation.upcastPayload(
SimpleSerializedType(
markeradSomFavoritEvent,
newRevision
),
JsonNode::class.java
) { node ->
(node as ObjectNode).put("id", context[event.get("id").asText()].toString())
node
}
Event.newBuilder()
.mergeFrom(eventWithToken.event)
.setAggregateIdentifier(context[event.get("id").asText()].toString())
.setPayload(
SerializedObject.newBuilder()
.mergeFrom(eventWithToken.event.payload)
.setData(
ByteString.copyFrom(
eventSerializer.serialize(
upcastedEvent.data,
ByteArray::class.java
).data
)
)
.build()
)
.build();
}
avmarkeradSomFavoritEvent -> {
val upcastedEvent = intermediateRepresentation.upcastPayload(
SimpleSerializedType(
avmarkeradSomFavoritEvent,
newRevision
),
JsonNode::class.java
) { node ->
(node as ObjectNode).put("id", context[event.get("id").asText()].toString())
node
}
Event.newBuilder()
.mergeFrom(eventWithToken.event)
.setAggregateIdentifier(context[event.get("id").asText()].toString())
.setPayload(
SerializedObject.newBuilder()
.mergeFrom(eventWithToken.event.payload)
.setData(
ByteString.copyFrom(
eventSerializer.serialize(
upcastedEvent.data,
ByteArray::class.java
).data
)
)
.build()
)
.build();
}
else -> {
throw UnsupportedOperationException()
}
}
appender.replaceEvent(eventWithToken.token, upcastedEventWithToken)
}
.execute { connection.eventTransformationChannel() }
.get()
} catch (e: Exception) {
logger.error(e) { "Error executing event transformation" }
} finally {
connection.ensureNoActiveTransformations()
}
}
logger.info { "Transformation complete." }
}
Executing this yields an error though:
io.grpc.StatusRuntimeException: CANCELLED: Invalid aggregate identifier: maghei, expecting cb6a0585-6191-4404-b741-412f66e2273f
Is it not possible to change aggregate identifiers using event transformation, or am I missing something?
If not, how can I do this using some other mechanism?
Thanks!
/Magnus