Processor not work with kafka + mongodb

Hello people,

I would like to ask for help because I’m suffering from problems with projections that are not executed.
In our project we decided to use axon + kafka + mongodb being kafka and mongodb that we already use.
What happens is that we have 3 projections of our aggregate in a query project separate from the command project, what happens is that of the 3 projections for the same aggregate sometimes not all are executed, simply nothing is done.
And our query base is totally out of sync, it just doesn’t work, we’ve tried everything, it only works again if we delete the record referring to the processorName of the trackingtokens, no error log is shown.
We are using version 4.5 of kafka extension + 4.5 of mongo + 4.5.3 of axon.
Does anyone have any light for us to help us on how to solve this problem?

Our configuration in the query project is as below:

axon:
event-handling:
processors:
stock-projection:
source: streamableKafkaMessageSource
mode: POOLED
thread-count: 6
batch-size: 20
initial-segment-count: 6
sequencing-policy: sequentialPerAggregatePolicy
notification-projection:
source: streamableKafkaMessageSource
mode: POOLED
thread-count: 6
batch-size: 4
initial-segment-count: 4
sequencing-policy: sequentialPerAggregatePolicy
stock-replay-projection:
source: streamableKafkaMessageSource
mode: POOLED
thread-count: 8
batch-size: 15
initial-segment-count: 4
sequencing-policy: sequentialPerAggregatePolicy
log-projection:
source: streamableKafkaMessageSource
mode: POOLED
thread-count: 4
batch-size: 20
initial-segment-count: 2
serializer:
general: JACKSON
events: JACKSON
messages: JACKSON
kafka:
clientid: stock-query-events-consumer
defaulttopic: stockservice.stock.events
consumer:
enable-auto-commit: true
event-processor-mode: POOLED_STREAMING
bootstrapservers: ${kafka.server}:${kafka.port}
properties:
group-id: “stockservice.stock.events.query-consumer”
auto-offset-reset: earliest
properties:
security.protocol: PLAINTEXT
distributed:
enabled: true

Hi Renato,

There are some improvements in 4.5.4 Kafka extension, that might help a little. Since you are used the streaming event processors, I would advise to set enable-auto-commit to false, since the offsets are stored in an Axon way. Do you have the Mongo Token store configured for this?

Could you maybe check the logs for warnings or errors? If I understand correctly, the main problem is that the projections are update correctly sometimes, and recreating the whole projection is the only way to recover?

Yes, I have mongo store configured.
I updated today’s axon version to 4.5.14 and kafka to 4.5.4 today, removing the enable-auto-commit.
the strange thing is that it works for 2 of the projections and screw on one is random

But nothing in the logs related to the failed projection?

nothing, it just doesn’t work.
I think it has to do with the mongo extension,
I’m not sure how to proceed, thinking about trying to migrate to the axon server because I don’t know how to proceed anymore.
Following the log you see it running the projections, and you see that it skips some, then you go and run it again, there it goes and runs them all.
it’s random

Hello @Gerard ,
I can’t understand how work error handler when an exception is thrown from EventProcessor.
My configuration is
source: streamableKafkaMessageSource
mode: POOLED
And the documentation says the error handler rethrow exception to processor catch, and when using
PooledStreamingEventProcessor the documentation says: " The PooledStreamingEventProcessor simply aborts the failed part of the process".
What happens if in my listener I have trouble?
How do retries work if in my listener I try to save in DB and my db is out?

It’s complicated, but by default errors from event handling just get logged. You can however configure a different ListenerInvocationErrorHandler and use a @ExceptionHandler explained here to retry if you want.

@Gerard , I notice one issue with kafka + mode POOLED.
I have a scenario stock update, where I have a stock that are formed by others (KIT),
For example
Product stock XPTO - 10 qty
Product stock KIT from XPTO - 2 qty (where this is calculated 10 / 5 → 5 per kit, result 2 items)
When I run an update on aggregate XPTO my EventHandler identifies that I have stocks associated and trigger a new update to stock kit.
But, the problem is: My aggregate is trigged for all aggregate (XPTO and XPTO KIT) but the event handler is not, only some events are triggered.
In my case I have a Stock parent that has 9 stocks kits, sometimes 3 event handlers are trigged, sometimes six, sometimes one.

And if I have:

And if I have:

axon:
  eventhandling:
    processors:
      validation-stock:
        source: streamableKafkaMessageSource
        mode: POOLED
        thread-count: 6
        batch-size: 1000
        initial-segment-count: 6
        sequencing-policy: sequentialPerAggregatePolicy

mode: POOLED I have this problem, if mode: TRACKING all working fine.
Why this behaviour?