Axon Server Stream Error

Hi
since yesterday I get “Stream Error” exceptions in our production Axon Server.

Environment:

  • Axon Server SE v4.6.8 running on Kubernetes (EKS) as single node on ARM nodes, deployed as StatefulSet. Max heap 1G
  • The Axon Server is running in this configuration successful for amore than 2 years now.
  • Axon Server configuration:
    axoniq.axonserver.command-threads=5
    axoniq.axonserver.query-threads=5
    axoniq.axonserver.initial-nr-of-permits=5000
    axoniq.axonserver.nr-of-new-permits=2500
    axoniq.axonserver.new-permits-threshold=2500
    axoniq.axonserver.keep-alive-timeout=10000
  • Grafana monitoring shows no abnormalities

Starting yesterday we get warnings (and errors) when running queries. Often clients get disconnected from Axon Server and the client operation fails after these errors. The errors do not always lead to client errors, so it is not totally deterministic. Reconnection to Axon Server succeeds.
In the past we have not seen these logs in Axon Server, at least not in the last month for which I have logs available.

I already tried the following without success so far:

  • upgraded Axon Server from 4.6.7 to 4.6.8
  • upgraded EKS nodes to latest operating systems (to make sure the hosts are fresh)
  • increased axoniq.axonserver.max-message-size to 8MB (no success, so I rolled back to default)

I see 2 exceptions in the AS logs that are reported with WARN level:

An exception was thrown by io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$5.operationComplete()

java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@f18f4f8 rejected from java.util.concurrent.ThreadPoolExecutor@237226aa[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 86959]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
	at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.closedInternal(ServerImpl.java:930)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.closed(ServerImpl.java:888)
	at io.grpc.internal.AbstractServerStream$TransportState.closeListener(AbstractServerStream.java:339)
	at io.grpc.internal.AbstractServerStream$TransportState.access$100(AbstractServerStream.java:184)
	at io.grpc.internal.AbstractServerStream$TransportState$2.run(AbstractServerStream.java:311)
	at io.grpc.internal.AbstractServerStream$TransportState.deframerClosed(AbstractServerStream.java:241)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerStream$TransportState.deframerClosed(NettyServerStream.java:164)
	at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
	at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:198)
	at io.grpc.internal.AbstractServerStream$TransportState.complete(AbstractServerStream.java:315)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerStream$TransportState.complete(NettyServerStream.java:164)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$5.operationComplete(NettyServerHandler.java:685)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$5.operationComplete(NettyServerHandler.java:682)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.tryPromise(Http2CodecUtil.java:382)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:349)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:261)
	at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:52)
	at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
	at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
	at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
	at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
	at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel.doWriteBytes(AbstractEpollChannel.java:364)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytes(AbstractEpollStreamChannel.java:260)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteSingle(AbstractEpollStreamChannel.java:471)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:429)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.flush0(AbstractEpollChannel.java:557)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:2138)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:803)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:780)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:147)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:34)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:46)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Stream Error

io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:173)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:357)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1007)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:963)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:515)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:521)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:628)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:444)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:509)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:259)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1236)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Any help is appreciated.

Klaus

Some more infos:

  • Clients are running Axon Framework 4.6.3 with Spring Boot 2.7.8
  • All on JDK 17

Another update:
In the meantime I found out that it seems that I also have to increase the max-message-size on client side. To be honest I did not find any docs on this.
Is this correct? If increasing the axoniq.axonserver.max-message-size on Axon Server side we also have to set the axon.axonserver.max-message-size in all clients.
Also it seems that in Axon Server I can configure “8MB” while on client side I cannot use 8MB but have to use “8388608”.

Anyway, after increasing both server and client side message size, I see less “Stream Error” warnings, but I still get some.

1 Like

The “Stream Error” exception is being thrown because a task submitted to a java.util.concurrent.ThreadPoolExecutor has been rejected. The thread pool executor has been terminated, and its size and number of active threads are 0, with 0 queued tasks and a large number of completed tasks. This suggests that the thread pool is exhausted and unable to process any further tasks.

There are a few possible reasons for the thread pool exhaustion:

  • High volume of requests that cannot be handled by the thread pool within the given configuration.
  • The system is running low on resources, such as memory or CPU, and is unable to process tasks efficiently.
  • A bug or memory leak within the application is preventing the thread pool from being able to operate effectively.

To resolve the issue, you can try the following steps:

  • Increase heap size to 2GB
  • Monitor the system’s resource usage and identify if there are any resource constraints.
  • Increase the number of command threads and query threads to see if that helps.
  • Review the application logs to see if there is any information about what is causing the thread pool exhaustion.
  • Check for any memory leaks or bugs within the application that may be preventing the thread pool from operating effectively.
  • Consider adding additional nodes to the Axon Server cluster to help distribute the workload and alleviate the pressure on a single node.

Hi Stefan,
thanks for replying:-)

The mentioned exceptions are from Axon Server, so I do not suspect a memory or threading issue in the client application(s).

We are running a long running query at the time the exception shows up. Could this be a reason?

How is the axoniq.axonserver.query-threads=5 working in detail? I did not find meaningful docs. If it defines a thread pool of 5 threads in my AS for handling all queries from all client applications, this could be an explanation. We are doing quite some queries at the time the exception occurs.

Are there any recommendations for query-threads sizing?

The axoniq.axonserver.query-threads setting determines the size of the thread pool for handling queries from client applications.

A value of 5 means that there will be 5 threads dedicated to handling queries. This is useful when you have many queries being executed simultaneously and want to control the level of concurrency.

As for recommendations on sizing, it depends on various factors such as the number of queries being executed simultaneously, the processing power of the machine running AxonServer, and the desired level of concurrency. You can start with a small number of threads and gradually increase it based on performance metrics and monitoring of your Axon Server instance.

I would recommend to try increasing Axon Server heap size to 2G and see if issue persists.

hello again.
I increased the max heap to 2G and increased query-threads to 15. So far I did not see any stream error exception in the logs with the same test that produced them before. So it feels much better now.
I will keep an eye on the logs and Grafana :slight_smile:

Thanks for the quick help

1 Like