Events other than first event created by ContextAwareEventMultiUpcaster are not processed by @EventHandler

Hi.

I have an implementation of ContextAwareEventMultiUpcaster that handles EventA_1.0 and returns EventA_2.0 and EventB_1.0

The ContextAwareEventMultiUpcaster does return both events.

If I send a command to an aggregate, it’s @EventSourceHandler method is called with both new events created by the ContextAwareEventMultiUpcaster.

If I call eventStore.readEvents for the aggregate, I see both events created by the ContextAwareEventMultiUpcaster

My @EventHandler is the problem. The ContextAwareEventMultiUpcaster returns a stream of EventA_2.0, EventB_1.0 when upcasting, but the @EventHandler for EventB_1.0 is never called. If I instead return EventB_1.0, EventA_2.0, then the @EventHandler for EventB_1.0 is called…

Is this a bug? It seems as only the first event returned from ContextAwareEventMultiUpcaster is sent to my @EventHandlers. Or is there something I am missing when creating the events in the upcaster? I basically call the upcastPayload twice in doUpcast, one time removing one attribute creating EventA_2.0, and the next time adding the attribute creating EventB_1.0.

Hi @daysleeper75, welcome to the forum!

Would you be able to share how you have configured your Event Processors?
So, the components that provide the events to your @EventHandler annotated methods, aka the “Event Handling Components.”

I am asking the above as I’m wondering whether there might be something amiss with the configuration of the query-side of your application. So, on that note, would you be able to share how you start your application?
Do you have a separation in microservices, for example?

Let’s try to tackle this issue!

This is a rather small application, everything is currently separated in code but not in deployment units… everything is deployed into openshift in one spring boot jar, running in 2 pods.

I have cut’n’pasted all code this I think might be relevant below:

@EventDescription("Applikationsinformation uppdaterad")
@Revision("2.0")
data class ApplikationsinformationUppdateradEvent(
    override val id: ApplikationId,
    val namn: String,
    val beskrivning: String,
	// This attribute was here in 1.0
	// val behorighetsansvarig: Anvandarnamn, 
    val alias: List<String>,
    val dokumentation: List<String>
) : ApplikationEvent<ApplikationId> {
    companion object
}

@EventDescription("Behörighetsansvarig uppdaterad")
@Revision("1.0")
data class BehorighetsansvarigUppdateradEvent(
    override val id: ApplikationId,
    val behorighetsansvarig: Anvandarnamn,
) : ApplikationEvent<ApplikationId> {
    companion object
}

class ApplikationsinformationUppdateradEvent1To2Upcaster() :
    ContextAwareEventMultiUpcaster<MutableMap<String, String?>>() {

    companion object {
        private val logger = KotlinLogging.logger {}
    }

    private val context: MutableMap<String, String?> = mutableMapOf()

    private val SKAPAD_TARGET_TYPE = SimpleSerializedType(ApplikationSkapadEvent::class.qualifiedName, "1.0");
    private val UPPDATERAD_TARGET_TYPE =
        SimpleSerializedType(ApplikationsinformationUppdateradEvent::class.qualifiedName, "1.0");
    private val BEHORIGHETSANSVARIG_TARGET_TYPE =
        SimpleSerializedType(BehorighetsansvarigUppdateradEvent::class.qualifiedName, "1.0");

    override fun canUpcast(
        intermediateRepresentation: IntermediateEventRepresentation,
        context: MutableMap<String, String?>
    ): Boolean {
        return listOf(SKAPAD_TARGET_TYPE, UPPDATERAD_TARGET_TYPE).contains(
            intermediateRepresentation.type
        )
    }

    override fun doUpcast(
        intermediateRepresentation: IntermediateEventRepresentation,
        context: MutableMap<String, String?>
    ): Stream<IntermediateEventRepresentation> {
        return when (intermediateRepresentation.type) {
            SKAPAD_TARGET_TYPE -> {
                val event = intermediateRepresentation.getData(JsonNode::class.java).data
                logger.debug { "Store upcast context ${event.get("id")} -> ${event.get("behorighetsansvarig")}" }
                context[event.get("id").asText()] = event.get("behorighetsansvarig").asText()

                Stream.of(intermediateRepresentation)
            }
            UPPDATERAD_TARGET_TYPE -> {
                var behorighetsansvarigEvent: IntermediateEventRepresentation? = null
                val event = intermediateRepresentation.getData(JsonNode::class.java).data
                val cached = context[event.get("id").asText()]
                val current = event.get("behorighetsansvarig").asText()
                if (cached != current) {
                    logger.debug { "Store upcast context ${event.get("id")} -> ${event.get("behorighetsansvarig")}" }
                    context[event.get("id").asText()] = event.get("behorighetsansvarig").asText()

                    behorighetsansvarigEvent = intermediateRepresentation.upcastPayload(
                        SimpleSerializedType(BEHORIGHETSANSVARIG_TARGET_TYPE.name, "1.0"),
                        JsonNode::class.java
                    ) { node ->
                        val id = node.get("id").asText()
                        val behorighetsansvarig = node.get("behorighetsansvarig").asText()
                        (node as ObjectNode).removeAll()
                        node.replace("id", JsonNodeFactory.instance.textNode(id))
                        node.replace(
                            "behorighetsansvarig",
                            JsonNodeFactory.instance.textNode(behorighetsansvarig)
                        )
                        node
                    }
                }

                val uppdateradEvent = intermediateRepresentation.upcastPayload(
                    SimpleSerializedType(UPPDATERAD_TARGET_TYPE.name, "2.0"),
                    JsonNode::class.java
                ) { node ->
                    (node as ObjectNode).remove("behorighetsansvarig")
                    node
                }

                return Stream.of(*listOfNotNull(uppdateradEvent, behorighetsansvarigEvent).toTypedArray())
            }
            else -> {
                throw RuntimeException("Failed to upcast '${intermediateRepresentation.type}'")
            }
        }
    }

    override fun buildContext(): MutableMap<String, String?> {
        return context
    }

}



@Component
@ProcessingGroup(ProcessingGroups.PROJECTIONS)
class ApplikationEventHandler(
    private val applikationViewRepository: ApplikationViewRepository
) {
    @EventHandler
    fun handle(event: ApplikationsinformationUppdateradEvent, @Timestamp senastUppdaterad: Instant) {
        val entity = applikationViewRepository.findById(event.id).orElseThrow().also {
            it.namn = event.namn
            it.beskrivning = event.beskrivning
            it.alias = event.alias
            it.dokumentation = event.dokumentation
            it.senastUppdaterad = OffsetDateTime.ofInstant(senastUppdaterad, ZoneId.systemDefault())
        }

        applikationViewRepository.save(entity)
    }

// This is never called
    @EventHandler
    fun handle(event: BehorighetsansvarigUppdateradEvent, @Timestamp senastUppdaterad: Instant) {
        val entity = applikationViewRepository.findById(event.id).orElseThrow().also {
            it.behorighetsansvarig = event.behorighetsansvarig
            it.senastUppdaterad = OffsetDateTime.ofInstant(senastUppdaterad, ZoneId.systemDefault())
        }

        applikationViewRepository.save(entity)
    }
}

@Bean
fun eventUpcasterChain(): EventUpcasterChain {
	return EventUpcasterChain(
		ForvaltningsobjektUppdateratEvent1Upcaster(),
		ApplikationUppdateradEvent1Upcaster(),
		KomponentUppdateradEvent1To2Upcaster(),
		HalsokontrollUppdateradEvent1To2Upcaster(),
		SlaSkapatEvent1To2Upcaster(),
		ApplikationsinformationUppdateradEvent1To2Upcaster()
		// Next change here...
	)
}


axon:
  serializer:
    events: jackson
    general: xstream
    messages: xstream
  eventhandling:
    processors:
      lookup:
        mode: subscribing
      projections:
        mode: pooled
        thread-count: 8
        initial-segment-count: 16
      jmspublisher:
        mode: pooled
        thread-count: 4
        initial-segment-count: 8

Some code where it seems to work:

// Aggregate looks like this, and both @EventSourcingHandler's are called when some command is about to be processed

@Aggregate
class Applikation {

    @EventSourcingHandler
    fun on(event: ApplikationsinformationUppdateradEvent) {
        this.beskrivning = event.beskrivning
        this.namn = event.namn
        this.alias = event.alias.toMutableList()
        this.dokumentation = event.dokumentation.toMutableList()
    }

    @EventSourcingHandler
    fun on(event: BehorighetsansvarigUppdateradEvent) {
        this.behorighetsansvarig = event.behorighetsansvarig
    }
}

// And I have a controller that returns kind of an auditlog per Aggregate, code looks like this
val mapper = jacksonObjectMapper().registerModule(JavaTimeModule())

        val nodes = this.eventStore.readEvents(applikationView.id.toString()).asStream().toList().map {
            val userId = it.metaData["userId"] as String?
            val eventDescription = it.payloadType.getAnnotation(EventDescription::class.java)

            Applikationshandelse(
                HandelseId(DigestUtils.md5DigestAsHex((it.identifier + mapper.writeValueAsString(it.payload)).toByteArray())),
                OffsetDateTime.ofInstant(it.timestamp, ZoneId.systemDefault()),
                if (userId != null) Anvandarnamn(userId) else null,
                eventDescription?.value,
                mapper.writeValueAsString(it.payload)
            )
        }.asReversed()
		
// Events from the ContextAwareEventMultiUpcaster are processed and working just fine in these cases

Could this somehow be related to mode: pooled? I added that not so long ago to get some speed when processing all events (I usually copy the production axon data-file to my desktop when developing).

Oh yes. I commented out the pooled-stuff in application.yaml and now the @EventHandler is being called. (Not all times that I expected but maybe there is some logic missing in my upcaster, didn’t test it out since I ran into this problem first…)

Is this expected?

Verified. I fixed the logic error in my upcaster and it will now produce the correct number of events.

@EventHandler is not called when this config is active:

axon:
  serializer:
    events: jackson
    general: xstream
    messages: xstream
  eventhandling:
    processors:
      lookup:
        mode: subscribing
      projections:
        mode: pooled
        thread-count: 8
        initial-segment-count: 16

“projections” is the @ProcessingGroup of the @Component that has the @EventHandler.

I change config to

axon:
  serializer:
    events: jackson
    general: xstream
    messages: xstream
  eventhandling:
    processors:
      lookup:
        mode: subscribing

Now the @EventHandler is called.

The type of EventProcessor shouldn’t (famous last words) have any impact on how your Upcaster Chain acts. As I got a little confused from your previous comment, where you stated you fixed a logic error in your upcaster, I’d like to confirm again:

If you define a mode through a properties file for your projections processing group, the:

  1. upcaster isn’t invoked?
  2. event handler isn’t invoked?

In either case, would you be able to verify if the same problem occurs if you use the code configuration instead of a properties file?
More specifically, that would be the following:

@Autowired
fun configProcessor(processingConfigurer: EventProcessingConfigurer) {
    processingConfigurer.registerPooledStreamingEventProcessor("projections")
}

Just ask to rule out possibilities, really.
If there, however, is a bug in the configuration of the PooledStreamingEventProcessor when using context-aware multi upcasters, we’d have a bug on our hands that would require resolution. :slight_smile:

I can only come to the conclusion that there must be a bug hiding out somewhere…

With this in application.yaml

axon:
  serializer:
    events: jackson
    general: xstream
    messages: xstream
  eventhandling:
    processors:
      projections:
        mode: pooled
        thread-count: 8
        initial-segment-count: 16

Only the first event returned in the stream from my ContextAwareEventMultiUpcaster is passed on to the @EventHandler. (I tested by changing the order of events returned in the stream by the ContextAwareEventMultiUpcaster).

With pooled-config present, methods in @Aggregates annotated with @EventSourcingHandler receive the events created by the ContextAwareEventMultiUpcaster.

With pooled-config present, code like this eventStore.readEvents(id).asStream().toList() sees all events created by the ContextAwareEventMultiUpcaster.

its only the calls to @EventHandler that is missing

If I remove pooled the config,

axon:
  serializer:
    events: jackson
    general: xstream
    messages: xstream

then all @EventHandlers are called as expected.

If I add the config code that you suggested (and make sure pooled is not in application.yaml):

@Autowired
fun configProcessor(processingConfigurer: EventProcessingConfigurer) {
    processingConfigurer.registerPooledStreamingEventProcessor("projections")
}

I get an error:

Event processor with name projections already exists

Is that because my @Component with the @Eventhandler is configured with @ProcessingGroup("projections") and is already created?

My confusion about ContextAwareEventMultiUpcaster was about when ContextAwareSingleEntryMultiUpcaster::buildContext is called.

If I do

eventStore.readEvents(id).asStream().toList()

I would expect ContextAwareSingleEntryMultiUpcaster::buildContext to be called once, since I am reading one aggregate? Thats not the case.

I have a stream of events:

A -> A -> A

I implement ContextAwareSingleEntryMultiUpcaster to split the stream into

A -> B -> A -> B -> A -> B

But, I would really like the stream of new events to be

A -> B -> A -> A

Since the information in B in this case hasnt changed. I tought I could keep this info, the latest value of B, in the context, but buildContext seems to be called more often than once per aggregate?

My confusion of this has nothing to do with the bug. Right now my stream looks like:

A -> B -> A -> B -> A -> B

But B never reach the @EventHandler when config is pooled.

Framework 4.5.5 with server 4.5.9 (not enterprise)

Thanks!

Regarding my confusion about when and how ContextAwareEventMultiUpcaster::buildContext is called:

Am I supposed to create the context once, and that context will be used for the processing of all events passed to the ContextAwareEventMultiUpcaster per event? Like this:

override fun buildContext(): MutableMap<String, String> {
        return mutableMapOf()
    }

Lets say I have this and pass a command to an @Aggregate. The context above should be created once and used for the processing of all events passed to the @EventSourcingHandler-methods of this aggregate, correct?

If I then do something like this:

this.eventStore.readEvents(id).asStream()

then it seems as if the events passed to the upcasting chain are passed one by one, rather than like a stream of events? The upcaster buildContext seems to be called for every event processed…? so there isn’r really a context in this scenario, since its recreated for every event by the semantics of the asStream method.

?

…and just to be sure that this confusion is not the thing that makes the @EventHandler not being called, I removed all context logic from my upcaster.

pooled is now configured and my upcaster returns this

return Stream.of(*listOfNotNull(uppdateradEvent, behorighetsansvarigEvent).toTypedArray())

I see all upcasted events when i call this.eventStore.readEvents(id). @EventHandler for event uppdateradEvent (returned first from upcaster) is called, but @EventHandler for event behorighetsansvarigEvent (returned second from upcaster) is not called.

I change return order in upcaster:

return Stream.of(*listOfNotNull(behorighetsansvarigEvent, uppdateradEvent).toTypedArray())

Now @EventHandler for behorighetsansvarigEvent is called, but not @EventHandler for uppdateradEvent.

Only the first event is passed to its @EventHandler

I remove the pooled-stuff from application.yaml , I see all upcasted events when i call this.eventStore.readEvents(id) and all @EventHandlers are called, as expected.

Any comment @Steven_van_Beelen ?

The odd thing is, that I would expect your context-aware upcaster to not work for both Tracking (the default event processor) and Pooled Streaming.
My reasoning for this is, that both StreamingEventProcessor implementations retrieve events in batches.
And, for each batch, the UpcasterChain is invoked.

Furthermore, the default batch size is 1. Thus, there are no events to fill the context with, as the stream of events consists out of one event.

Regardless of the topic whether this is pooled-streaming or tracking specific, I would like to ask you to adjust the batchSize of your processor. You can either choose to change this through the EventProcessorConfigurer or through an application.properties file:

public class AxonConfig {
    //...
    @Autowired
    public void configureBatchSize(EventProcessingConfigurer processingConfigurer) {
        processingConfigurer.registerPooledStreamingEventProcessor(
                "[your-processor-name-here]", 
                Configuration::eventStore, 
                (config, builder) -> builder.batchSize(1000)
        );
    }
}

So, let me know whether increasing the batchSize does the job for you!


Let me provide a note on one of your posts here for added clarity:

Regarding my confusion about when and how ContextAwareEventMultiUpcaster::buildContext is called:
Am I supposed to create the context once, and that context will be used for the processing of all events passed to the ContextAwareEventMultiUpcaster per event?

Axon invokes the buildContext method at the start of upcasting a stream. Thus, if the stream consists out of a complete set of events (like the aggregate event stream when event sourcing), you’d have N events at your disposal. Before the UpcastingChain traverses the stream, it will invoke buildContext(). After this, the canUpcast and doUpcast (if canUpcast returned true) are invoked.

If your buildContext method reads from some form of common storage, then you could reuse it for the entirety of reading events in an Event Processor. In most cases, however, the context-aware solution reads events that aren’t spaced far between. Hence, a sample that constructs an in-memory map works in most scenarios.

I realize that these specifics aren’t clear from the JavaDoc or Reference Guide. After I just went through both to provide material for you. So, my apologies for the unclarity here.

Ok, pooled-issue first:

I have done some debugging…

When pooled is enabled (batch size was 1000 as requested by you), the event is “lost” here:

By returning false there, it is never inserted in the processingQueue, and never appears in the events processed later here

Is this a bug in the pooled-code, or am I doing something wrong in my Upcaster?

Here is my current upcaster:

class ApplikationsinformationUppdateradEvent1To2Upcaster :
    ContextAwareEventMultiUpcaster<MutableMap<String, String>>() {

    companion object {
        private val logger = KotlinLogging.logger {}
    }

    private val UPPDATERAD_TARGET_TYPE =
        SimpleSerializedType(ApplikationsinformationUppdateradEvent::class.qualifiedName, "1.0")
    private val BEHORIGHETSANSVARIG_TARGET_TYPE =
        SimpleSerializedType(BehorighetsansvarigUppdateradEvent::class.qualifiedName, "1.0")

    override fun canUpcast(
        intermediateRepresentation: IntermediateEventRepresentation,
        context: MutableMap<String, String>
    ): Boolean {
        return listOf(UPPDATERAD_TARGET_TYPE).contains(
            intermediateRepresentation.type
        )
    }

    override fun doUpcast(
        intermediateRepresentation: IntermediateEventRepresentation,
        context: MutableMap<String, String>
    ): Stream<IntermediateEventRepresentation> {
        return when (intermediateRepresentation.type) {
            UPPDATERAD_TARGET_TYPE -> {
                var behorighetsansvarigEvent: IntermediateEventRepresentation? = null
                val event = intermediateRepresentation.getData(JsonNode::class.java).data
                val cached = context[event.get("id").asText()]
                val current = event.get("behorighetsansvarig")

                if (!current.isNull && !current.asText().equals(cached, true)) {
                    behorighetsansvarigEvent = intermediateRepresentation.upcastPayload(
                        SimpleSerializedType(BEHORIGHETSANSVARIG_TARGET_TYPE.name, "1.0"),
                        JsonNode::class.java
                    ) { node ->
                        val id = node.get("id").asText()
                        val behorighetsansvarig = node.get("behorighetsansvarig").asText()
                        (node as ObjectNode).removeAll()
                        node.replace("id", JsonNodeFactory.instance.textNode(id))
                        node.replace(
                            "behorighetsansvarig",
                            JsonNodeFactory.instance.textNode(behorighetsansvarig)
                        )
                        node
                    }

                    logger.debug { "Store upcast context ${event.get("id")} -> ${event.get("behorighetsansvarig")}" }
                    context[event.get("id").asText()] = event.get("behorighetsansvarig").asText()
                }

                val uppdateradEvent = intermediateRepresentation.upcastPayload(
                    SimpleSerializedType(UPPDATERAD_TARGET_TYPE.name, "2.0"),
                    JsonNode::class.java
                ) { node ->
                    (node as ObjectNode).remove("behorighetsansvarig")
                    node
                }

                return Stream.of(*listOfNotNull(uppdateradEvent, behorighetsansvarigEvent).toTypedArray())
            }
            else -> {
                throw RuntimeException("Failed to upcast '${intermediateRepresentation.type}'")
            }
        }
    }

    override fun buildContext(): MutableMap<String, String> {
        return mutableMapOf()
    }

}

The upcaster does work as expected when I remove pooled.

Second issue, buildcontext-confusion:

Regarding my confusion with buildContext(), this upcaster now seems to work for the aggregate event stream when event sourcing and when used in the TrackingEventProcessor when configured like this:

@Bean
    fun eventUpcasterChain(): EventUpcasterChain {
        return EventUpcasterChain(
            ForvaltningsobjektUppdateratEvent1Upcaster(),
            ApplikationUppdateradEvent1Upcaster(),
            KomponentUppdateradEvent1To2Upcaster(),
            HalsokontrollUppdateradEvent1To2Upcaster(),
            SlaSkapatEvent1To2Upcaster(),
            ApplikationsinformationUppdateradEvent1To2Upcaster()
            // Next upcaster here...
        )
    }

In my application I also have a presentation to the user of all the events of the current aggreate that the user is browsing, like an audit log. Here, I read the events like this:

val nodes = this.eventStore.readEvents(applikationView.id.toString()).asStream().toList().map {...}

In this case it seems as if the events are processed one-by-one, and not as a stream as done when processed by the aggregate event stream when event sourcing and when used in the TrackingEventProcessor. This one-by-one-behavour causes the upcasters buildContext() to be called for every event, and not the stream. Is there a way to read the events from the store programmatically like I want to, but have it done as a stream in such a way that the buildContext() is used for the stream?

Done some more debugging, and found whats causing by problem with val nodes = this.eventStore.readEvents(applikationView.id.toString()).asStream().toList().map {...}

Here:

input contains all messages but the call to flatMap makes them being processed by a new upcaster one-by-one, making the call to ContextAwareEventMultiUpcaster::buildContext for each and every message, not for the stream as a whole… bug or intended? Or is there some other way?

@Steven_van_Beelen any comments to my 2 latest posts?

My apologies, @daysleeper75.
I expected my Discuss settings to nudge me correctly, but clearly enough, they haven’t…
Nonetheless, let’s move to the points at hand.

1. Multi-Upcaster and Pooled Streaming Processor

Given the debug effort you’ve put in, I’ve become relatively confident that we indeed are talking about a bug.
I will try to introduce a test case to validate the behavior.
I’ll update this ticket accordingly as soon as I have a test case that consistently shows the faulty behavior.

The easiest workaround is to use the Tracking Event Processor for the time being.
Note that switching between Tracking and Pooled Streaming is painless.

2. Multi-Upcaster with Direct EventStore invocation

No offense, but this seems off at first glance.
When Event Sourcing with Axon Server, the same method you’re sharing above is used.
Hence, if Event Sourcing works as expected, how come this doesn’t work as expected?

Regardless, you’re experiencing this behavior, so there must be something to it.
I do spot within the implementation a different path to reading events.
However, this will only trigger if you’re using snapshots.
As such, I assume you’re using snapshots.
If that’s correct, please do share so.

Now, let’s move back to the other path.
The other path touches the EventStore#readEvents(String, int) method that can only lead to the AbstractEventStorageEngine:

As you can see, that method will first upcast instead of performing the aforementioned flat map.
So, maybe, you spotted two predicaments.

An easy way to figure this out, is if you change your code to invoke readEvents(String, int) instead of readEvents(String).
To keep reading the entire stream in that case, though, you should provide 0 as the sequence to start from.

Keep me posted on what the outcome is here!
Be sure to @smcvb me.
That way, I should definitely get a timely response.

Thanks @Steven_van_Beelen , I’m really thankful for all this support, hopefully we will be on enterprise somewhere in the not to distant future.

Regarding multi-Upcaster with Direct EventStore invocation:

I want a list of events that I present to the user kind of like an audit log. I have annotated by events with a human readable description (EventDescription.class in the code below) and create this list like this:

val nodes = this.eventStore.readEvents(applikationView.id.toString()).asStream().map {
            val userId = it.metaData["userId"] as String?
            val eventDescription = it.payloadType.getAnnotation(EventDescription::class.java)

            Applikationshandelse(
                HandelseId(DigestUtils.md5DigestAsHex((it.identifier + mapper.writeValueAsString(it.payload)).toByteArray())),
                OffsetDateTime.ofInstant(it.timestamp, ZoneId.systemDefault()),
                if (userId != null) Anvandarnamn(userId) else null,
                eventDescription?.value,
                mapper.writeValueAsString(it.payload)
            )
        }.toList().asReversed()

If I put a breakpoint in one of my upcasters I see this stacktrace:

buildContext:161, KomponentUppdateradEvent1To2Upcaster (se.lantmateriet.applikationslistan.coreapi.applikation.komponent)
buildContext:62, KomponentUppdateradEvent1To2Upcaster (se.lantmateriet.applikationslistan.coreapi.applikation.komponent)
upcast:38, ContextAwareSingleEntryMultiUpcaster (org.axonframework.serialization.upcasting)
upcast:58, GenericUpcasterChain (org.axonframework.serialization.upcasting)
upcast:58, GenericUpcasterChain (org.axonframework.serialization.upcasting)
upcastAndDeserialize:109, EventStreamUtils (org.axonframework.eventsourcing)
upcastAndDeserializeDomainEvents:71, EventStreamUtils (org.axonframework.eventsourcing)
upcastAndDeserializeDomainEvent:569, AxonServerEventStore$AxonIQEventStorageEngine (org.axonframework.axonserver.connector.event.axon)
lambda$readEvents$8:558, AxonServerEventStore$AxonIQEventStorageEngine (org.axonframework.axonserver.connector.event.axon)
apply:-1, 1560339588 (org.axonframework.axonserver.connector.event.axon.AxonServerEventStore$AxonIQEventStorageEngine$$Lambda$3814)
accept:271, ReferencePipeline$7$1 (java.util.stream)
accept:442, ReferencePipeline$11$1 (java.util.stream)
accept:195, ReferencePipeline$3$1 (java.util.stream)
tryAdvance:75, AggregateEventStream$1 (io.axoniq.axonserver.connector.event)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator (java.util.stream)
getAsBoolean:-1, 1574873161 (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$209)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
doAdvance:169, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:54, IteratorBackedDomainEventStream (org.axonframework.eventsourcing.eventstore)
hasNext:76, ConcatenatingDomainEventStream (org.axonframework.eventsourcing.eventstore)
forEachRemaining:132, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
copyInto:484, AbstractPipeline (java.util.stream)
wrapAndCopyInto:474, AbstractPipeline (java.util.stream)
evaluateSequential:913, ReduceOps$ReduceOp (java.util.stream)
evaluate:234, AbstractPipeline (java.util.stream)
collect:578, ReferencePipeline (java.util.stream)
toList:72, StreamsKt (kotlin.streams.jdk8)
handelser:247, ApplikationDataFetcher (se.lantmateriet.applikationslistan.app.query.applikation)

As you can see, AxonFramework/AxonServerEventStore.java at master · AxonFramework/AxonFramework · GitHub is in that stacktrace, and as I understand it that line will make the upcaster context AxonFramework/ContextAwareSingleEntryMultiUpcaster.java at axon-4.5.x · AxonFramework/AxonFramework · GitHub being rebuilt for each and every event…?

Is it that I want to materialize the event stream as a list that is causing the problem? Is there some other clever way to do this?

Sure thing, @daysleeper75!
As you’ve noticed, the response time is somewhat slower, but we still make it a point to help out our users. :slight_smile:

Multi-Upcasting and the PSEP

Concerning the predicament on the PooledStreamingEventProcessor, I did spot the issue. As you shared, it had to do with the validation in the WorkPackage, which didn’t include events with the same TrackingToken. The token would be the same in case an EventMultiUpcaster is used to move from one-to-many events.

I have made tests and a fix in the framework for this.
It still needs to be reviewed and approved, by the way.
If you’re interested though, you can see it here.

Directly reading from the Event Store

I don’t necessarily think it’s something you’re doing wrong in this case, @daysleeper75.
Rather another (minor) bug.

As a general rule of thumb, we don’t recommend sharing the entire event store for an Aggregate through this fashion.

If you’re using Axon Server, Axon Server’s query language can already provide you will all the events for an aggregate.
If you wouldn’t be using Axon Server, then the query language of the database in question could do a similar step for you.
Differently put, it’s something that operators tend to use.
Added, operators can directly access the infrastructure, so why would the client application provide a means to share that data?

If a client application does need something like this, you are actually dealing with a type of Query Model.
Furthermore, from the intent of DDD and the Bounded Context, you should be careful with sharing the “internals of your application.”
More specifically, you would consciously share the right parts of your API instead of everything.

Exposing the entire list of events in actuality is exposing the specifics of your applications. Ergo, it is something to be careful with, as (end-)users might just abuse this info.

Apart from the theoretical bit, I did point out we might be dealing with a (again minor) bug.
Just to be certain, would you mind trying out the following snippet when reading events:

// By specifying the sequence number to start reading from, we enter a different path through the `EventStore`
val aggregateSequenceNumber = 0

val nodes = this.eventStore.readEvents(applikationView.id.toString(), aggregateSequenceNumber).asStream().map {
            val userId = it.metaData["userId"] as String?
            val eventDescription = it.payloadType.getAnnotation(EventDescription::class.java)

            Applikationshandelse(
                HandelseId(DigestUtils.md5DigestAsHex((it.identifier + mapper.writeValueAsString(it.payload)).toByteArray())),
                OffsetDateTime.ofInstant(it.timestamp, ZoneId.systemDefault()),
                if (userId != null) Anvandarnamn(userId) else null,
                eventDescription?.value,
                mapper.writeValueAsString(it.payload)
            )
        }.toList().asReversed()

Let me know whether the above works out.
That allows me to narrow down the issue a bit more.

@Steven_van_Beelen first thank you for the release with the fix for PooledStreamingEventProcessor. It’s now in production and working just fine :slight_smile:

I changed the code by your example to call eventStore.readEvents with aggregateSequenceNumber, and indeed it does work now. The ContextAwareEventMultiUpcaster::buildContext-method is now called as expected so that the upcaster can maintain the correct context. So there is a minor bug just as you anticipated.

I totally agree with your reasoning that I probably should not read events like this in the first place, and that this should be query model on its own. It probably will be soon too, this first implementation was more like a quick poc to see if the information was useful. I do return a human readable string, not the actual event, so it is not actually exposing the specifics, but I guess it is pretty close… :wink:

If you’re using Axon Server, Axon Server’s query language can already provide you will all the events for an aggregate.

Do you mean that I could use the same query language as used in the axon dashboard? If so, do you have any pointers to where to look in the code for some kind of usage example? I struggle to find it…I do find the axon-server-se/axonserver-query-parser at master · AxonIQ/axon-server-se · GitHub but not the usage of it.

1 Like