When i first read about presistant streams (PS) i thought i could finally get rid of traking tokens from application.
Unfortunetly PS can be used only with subscirption processors which means all event handlers are invoked in the same thread so it is not efficient way of consuming events.
What I dont understand, why PS cannot be used with tracking processors? it would be very convinient to keep tracking tokens on axon server side, not in the application, just like it is done with kafka offsets
To clarify, persistent streams are implemented as subscribing processors, but they do not run in the same thread.
but i have tested it
I created two different processing groups (subscribing mode) and assign them to two different PS. i printed current thread from each event handler, all logs shown the same thread.
what wrong did i do?
You can override persistentStreamScheduler if you want to change threading behavior. You can use virtual threads as well, just override the bean
@Bean
public ScheduledExecutorService persistentStreamScheduler() {
return ...
}
@rsobies you are right, its a oversight in framework, there is only one thread.
We will create a patch release soon, but you can overcome this issue by defining your own thread pool:
@Bean("persistentStreamScheduler")
public ScheduledExecutorService myPersistentSteeamScheduler() {
return Executors.newScheduledThreadPool(10,
new AxonThreadFactory("persistent-streams"));
}
or with virtual threads:
@Bean
public ScheduledExecutorService persistentStreamScheduler() {
return Executors.newScheduledThreadPool(10, Thread.ofVirtual()
.name("persistent-streams-", 0)
.factory());
}
thank you for this tip. It works.
I can see another problem with PS. Each processing group must be assigned to its own PS, so there is a lot of manual configuration in the application. Maybe in the future it could be automatise?
For me it seams that concept of processing group is similar to kafaka consumer group, isn’t it a accurent analogy? If the state of consumer group is managed by kafka server , so mayby Axon should take the same approche? So all the managing of tracking tokens will be shift to axon server. The less cofiguration on client side the more convinience. This should be a default solution
We appreciate the feedback, do you have an proposal how to automate and do this “default” switch? The concept is similar yes, Axon Server takes care of the progress, and sends you new events as you go, no need to keep tracking your self.
PS could be configured in the same way as processing groups (PG), PG name is the same as package. threre is no need to define each PG in application.properties. So why not take the same approach?
if application is connected to axon server then PS could be default
Interesting suggestions, @rsobies!
As @stefand already shared, thanks for the feedback.
We (Stefan and I) have already been chatting about how to simplify the configuration of the Persistent Streams further because of this thread.
As a clear result, I’ve already made this pull request concerning the threading of the Persistent Stream.
We need to take some caveats into account regarding your last request to default to Persistent Streams when Axon Server is present.
For one, this could be regarded as a breaking change from a behavioral point of view.
If we introduced this, users would no longer get the (default) TrackingEventProcessor
with a TokenStore
based on the used database, but they would switch to a SubscribingEventProcessor
without any tokens.
Although I agree with your stance that such a shift has merit, I am afraid I need to make a hard stance that we would not do this for Axon Framework 4.
That, obviously, doesn’t say anything about Axon Framework 5.
Furthermore, given your suggestion, another idea that’s been crossing my mind is to switch the “if-statement” somewhat. What if we switch to the SubscribingEventProcessor
with a Persistent Stream when (1) Axon Server is present and (2) there are no dependencies/configurations in place to construct a persistent TokenStore
?
Although that would yet again be a breaking change on the behavioral level, I think this might be a feasible change to make.
Disregarding the above, I do agree it would be nice to have a more straightforward configuration solution to switch the default to SubscribingEventProcessors
with Persistent Streams. Hence, we will be diving into this subject.
thx for your answer.
to avoid breaking change, you cold introduce new propery, for example: axon.persistant-stream.default=true. setting this prop should make PS default on demand.