Configuration persistent streams

Hi,
is there any detailed example on how to configure an event processor to use persistent streams?
Right now I am using PooledStreamingEventProcessors for all processing groups and I want to switch to persistent streams. But the docs under Subscribing Event Processor do tell much about the details on how to configure using the Java API.

Are there any examples how to do this?

Thanks
Klaus

Hi again,
after some searching in the Axon code I found out how to configure a persistent streams processor. Maybe interesting for others (beware, it’s Kotlin:-):

    val persistentStreamProperties = PersistentStreamProperties(
      processingGroup,
      initialSegmentCount,
      PersistentStreamSequencingPolicyProvider.SEQUENTIAL_PER_AGGREGATE_POLICY,
      emptyList(),
      HEAD_POSITION,
      null
    )

    eventProcessingConfigurer.registerSubscribingEventProcessor(processingGroup) { configuration ->
      PersistentStreamMessageSource(
        processingGroup,
        configuration,
        persistentStreamProperties,
        Executors.newScheduledThreadPool(workerThreads, AxonThreadFactory("WorkPackage")),
        batchSize
      )

Thanks for answering your own question, @klauss42! I am confident it is virtually always helpful if all topics have some form of an answer, even if that comes from the one starting the topic.

Just wondering though, but what exactly were you missing from the description referred to? We can always make adjustments to the docs to make them clearer, of course.

Hey Steven,
If I remember correctly, I think what was missing in the docs was an example of how to construct the PersistentStreamMessageSource. That was not obvious to me.

The documentation currently only includes a comment like “* create/return PersistentStreamMessageSource *”.

Klaus

1 Like

Thanks for that, @klauss42. I’ve added that part (in this commit) just now for future users.

@klauss42
since AF 4.11.2 you can set the property

axon.axonserver.auto-persistent-streams-enable=true

this will configure PS for all processing groups, check the latest
axon docs