Axon not able to connect to Azure Event hub using Kafka protocol

java.io.IOException: An existing connection was forcibly closed by the remote host
at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_252]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_252]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_252]
at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_252]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377) ~[na:1.8.0_252]
at org.apache.kafka.common.network.SslTransportLayer.readFromSocketChannel(SslTransportLayer.java:205) ~[kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:528) ~[kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94) ~[kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) ~[kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) ~[kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) [kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) [kafka-clients-2.3.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:483) [kafka-clients-2.3.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) [kafka-clients-2.3.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) [kafka-clients-2.3.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.3.0.jar:na]

Events gets published with plain kafka client program. But not with Axon. Following are the configurations

axon.kafka.client-id=producer
axon.kafka.default-topic=test
axon.kafka.producer.transaction-id-prefix=deafultTxPrefix
axon.kafka.bootstrap-servers=****.servicebus.windows.net:9093
axon.kafka.properties.security.protocol=SASL_SSL
axon.kafka.properties.sasl.mechanism=PLAIN
axon.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://****.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessPolicy;SharedAccessKey=***********;EntityPath=test";

Other dependecies:

org.springframework.boot:spring-boot-starter-parent:2.1.4.RELEASE
org.apache.kafka:kafka-clients:2.3.0

Hi again Ravinda (stackoverfow),

Can you maybe share a bit more context? Which version of Axon framework and the Kafka extension? Maybe also some logging before the exception.

Hi Gerard,
Thanks for the reply, please find below versions,

	'org.springframework.boot' version '2.7.3'
	'io.spring.dependency-management' version '1.0.13.RELEASE'
        sourceCompatibility = '11'
	'org.axonframework', name: 'axon-kafka', version: '3.4'
	'org.axonframework', name: 'axon-spring-boot-starter', version: '3.3.3'
	implementation 'org.springframework.boot:spring-boot-starter-actuator'
	implementation 'org.springframework.kafka:spring-kafka'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'

LOGS:

2022-09-21 15:00:28.708  INFO 3368 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Instantiated a transactional producer.
2022-09-21 15:00:29.047  INFO 3368 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2022-09-21 15:00:29.227 DEBUG 3368 --- [           main] o.a.k.c.s.ssl.DefaultSslEngineFactory    : Created SSL context with keystore null, truststore null, provider SunJSSE.
2022-09-21 15:00:29.272 DEBUG 3368 --- [ExampleProducer] o.a.k.clients.producer.internals.Sender  : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Starting Kafka producer I/O thread.
2022-09-21 15:00:29.275 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Initialize connection to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) for sending metadata request
2022-09-21 15:00:29.276  INFO 3368 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-09-21 15:00:29.276  INFO 3368 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-09-21 15:00:29.276  INFO 3368 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1663752629272
2022-09-21 15:00:29.277 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.ClientUtils     : Resolved host ame-nonprod-eventhubs.servicebus.windows.net as 20.50.201.85
2022-09-21 15:00:29.277 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Initiating connection to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) using address ame-nonprod-eventhubs.servicebus.windows.net/20.50.201.85
2022-09-21 15:00:29.279 DEBUG 3368 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Kafka producer started
2022-09-21 15:00:29.282 DEBUG 3368 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Transition from state UNINITIALIZED to INITIALIZING
2022-09-21 15:00:29.282  INFO 3368 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Invoking InitProducerId for the first time in order to acquire a producer ID
2022-09-21 15:00:29.328 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to SEND_APIVERSIONS_REQUEST
2022-09-21 15:00:29.330 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Creating SaslClient: client=null;service=kafka;serviceHostname=ame-nonprod-eventhubs.servicebus.windows.net;mechs=[PLAIN]
2022-09-21 15:00:29.476 DEBUG 3368 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Enqueuing transactional request InitProducerIdRequestData(transactionalId='deafultTxPrefix0', transactionTimeoutMs=60000, producerId=-1, producerEpoch=-1)
2022-09-21 15:00:29.478 DEBUG 3368 --- [ExampleProducer] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Enqueuing transactional request FindCoordinatorRequestData(key='deafultTxPrefix0', keyType=1, coordinatorKeys=[])
2022-09-21 15:00:29.479 DEBUG 3368 --- [ExampleProducer] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Enqueuing transactional request InitProducerIdRequestData(transactionalId='deafultTxPrefix0', transactionTimeoutMs=60000, producerId=-1, producerEpoch=-1)
2022-09-21 15:00:29.480 DEBUG 3368 --- [ExampleProducer] o.apache.kafka.common.network.Selector   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2022-09-21 15:00:29.513 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Completed connection to node -1. Fetching API versions.
2022-09-21 15:00:29.946 DEBUG 3368 --- [ExampleProducer] o.a.k.common.network.SslTransportLayer   : [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=ame-nonprod-eventhubs.servicebus.windows.net/20.50.201.85:9093], selector=sun.nio.ch.WindowsSelectorImpl@82afcb15, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'ame-nonprod-eventhubs.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net, O=Microsoft Corporation, L=Redmond, ST=WA, C=US' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2022-09-21 15:00:30.048 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2022-09-21 15:00:30.262 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to SEND_HANDSHAKE_REQUEST
2022-09-21 15:00:30.263 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2022-09-21 15:00:30.422 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to INITIAL
2022-09-21 15:00:30.429 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to INTERMEDIATE
2022-09-21 15:00:30.591 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Set SASL client state to COMPLETE
2022-09-21 15:00:30.592 DEBUG 3368 --- [ExampleProducer] o.a.k.c.s.a.SaslClientAuthenticator      : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Finished authentication with no session expiration and no session re-authentication
2022-09-21 15:00:30.593 DEBUG 3368 --- [ExampleProducer] o.apache.kafka.common.network.Selector   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Successfully authenticated with ame-nonprod-eventhubs.servicebus.windows.net/20.50.201.85
2022-09-21 15:00:30.594 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Initiating API versions fetch from node -1.
2022-09-21 15:00:30.596 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=KafkaExampleProducer, correlationId=0) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.1.1')
2022-09-21 15:00:30.759 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=KafkaExampleProducer, correlationId=0): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
2022-09-21 15:00:30.760 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Initiating API versions fetch from node -1.
2022-09-21 15:00:30.760 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=KafkaExampleProducer, correlationId=1) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.1.1')
2022-09-21 15:00:30.923 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=KafkaExampleProducer, correlationId=1): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=3, maxVersion=7), ApiVersion(apiKey=1, minVersion=4, maxVersion=6), ApiVersion(apiKey=2, minVersion=0, maxVersion=2), ApiVersion(apiKey=3, minVersion=0, maxVersion=5), ApiVersion(apiKey=8, minVersion=0, maxVersion=3), ApiVersion(apiKey=9, minVersion=0, maxVersion=3), ApiVersion(apiKey=10, minVersion=0, maxVersion=1), ApiVersion(apiKey=11, minVersion=0, maxVersion=4), ApiVersion(apiKey=12, minVersion=0, maxVersion=1), ApiVersion(apiKey=13, minVersion=0, maxVersion=1), ApiVersion(apiKey=14, minVersion=0, maxVersion=1), ApiVersion(apiKey=15, minVersion=0, maxVersion=1), ApiVersion(apiKey=16, minVersion=0, maxVersion=1), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=1), ApiVersion(apiKey=19, minVersion=0, maxVersion=2), ApiVersion(apiKey=20, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=1), ApiVersion(apiKey=23, minVersion=0, maxVersion=0), ApiVersion(apiKey=32, minVersion=0, maxVersion=2), ApiVersion(apiKey=36, minVersion=0, maxVersion=1), ApiVersion(apiKey=37, minVersion=0, maxVersion=1), ApiVersion(apiKey=42, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
2022-09-21 15:00:30.937 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Node -1 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 3 to 7 [usable: 7], Fetch(1): 4 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 2 [usable: 2], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): UNSUPPORTED, CreateAcls(30): UNSUPPORTED, DeleteAcls(31): UNSUPPORTED, DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): UNSUPPORTED, AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): 0 [usable: 0], ElectLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED, AlterPartitionReassignments(45): UNSUPPORTED, ListPartitionReassignments(46): UNSUPPORTED, OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): UNSUPPORTED, DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED, DescribeTransactions(65): UNSUPPORTED, ListTransactions(66): UNSUPPORTED, AllocateProducerIds(67): UNSUPPORTED).
2022-09-21 15:00:30.939 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null)
2022-09-21 15:00:30.940 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=5, clientId=KafkaExampleProducer, correlationId=2) and timeout 30000 to node -1: MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
2022-09-21 15:00:30.942 DEBUG 3368 --- [ExampleProducer] o.a.k.clients.producer.internals.Sender  : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending transactional request FindCoordinatorRequestData(key='deafultTxPrefix0', keyType=1, coordinatorKeys=[]) to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) with correlation ID 3
2022-09-21 15:00:30.942 DEBUG 3368 --- [ExampleProducer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=KafkaExampleProducer, correlationId=3) and timeout 30000 to node -1: FindCoordinatorRequestData(key='deafultTxPrefix0', keyType=1, coordinatorKeys=[])
2022-09-21 15:00:31.206 DEBUG 3368 --- [ExampleProducer] o.apache.kafka.common.network.Selector   : [Producer clientId=KafkaExampleProducer, transactionalId=deafultTxPrefix0] Connection with ame-nonprod-eventhubs.servicebus.windows.net/20.50.201.85 disconnected

java.io.IOException: An existing connection was forcibly closed by the remote host
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:na]
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:na]
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[na:na]
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245) ~[na:na]
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[na:na]
	at org.apache.kafka.common.network.SslTransportLayer.readFromSocketChannel(SslTransportLayer.java:228) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:560) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) ~[kafka-clients-3.1.1.jar:na]
	at java.base/java.lang.Thread.run(Thread.java:866) ~[na:na]