OutOfMemory Exceptions using the Kafka Extension

Good Morning,
We’re trying to get our environment production ready and in doing so encountered an OOME. The main concern is not necessarily the root cause of it although I do have a question on that matter but the fact that the Consumer API thread dies silently while service continues to run. This results in our PaaS not picking up the fact that service crashed and attempt to restart it. Would it be possible to make the exceptions out of KafkaConsumer/Producer threads be more closely tied to the stability of the whole application or catch those exceptions.

The second question is with regard to the root cause of it. We noticed that that the OOME is occurring when there is no consumer lag on the service, meaning there are no outstanding events for it to consume, and yet it continues to crash with an OOME each time the service is restarted. If anyone has any clues to why this might be happening I very much appreciate your input.

  • Michael

OOME is swallowed by the following block of code:

`

try {
                    eventStream = ensureEventStreamOpened(eventStream, segment);
                    processBatch(segment, eventStream);
                    errorWaitTime = 1;
                } catch (UnableToClaimTokenException e) {
                    [logger.info](http://logger.info/)("Segment is owned by another node. Releasing thread to process another segment...");
                    releaseSegment(segment.getSegmentId());
                } catch (Exception e) {
                    // Make sure to start with a clean event stream. The exception may have caused an illegal state
                    if (errorWaitTime == 1) {
                        logger.warn("Error occurred. Starting retry mode.", e);
                    }
                    logger.warn("Releasing claim on token and preparing for retry in {}s", errorWaitTime);
                    releaseToken(segment);
                    closeQuietly(eventStream);
                    eventStream = null;
                    doSleepFor(SECONDS.toMillis(errorWaitTime));
                    errorWaitTime = Math.min(errorWaitTime * 2, 60);
                }

`

The generic Exception swallows the OOME and other potential exceptions instead of allowing the service to crash and restart.
We don’t completely understand why this is only a warning.

Would it be possible to minimally specify OOME catch above the generic and throw it to crash the app?
At the moment the service stops consuming and if we are not alerting on the log we would never know there’s a problem.

I apologize for spamming this thread. The second location where bubbling of the Exception would need to be permitted is here:

`

public void run() {
            try {
                processingLoop(segment);
            } catch (Throwable e) {
                logger.error("Processing loop ended due to uncaught exception. Pausing processor in Error State.", e);
                state.set(State.PAUSED_ERROR);

`

The main problem with OOMEs is that in order to effectively debug them we need a heap dump generated which will not occur if the exception is swallowed.

Hi Michael,

that last catch(Throwable ) is indeed where OOME are caught and swallowed. We’ll schedule a fix for this, as I agree that fatal errors should be propagated to the JVM.

Cheers,