HiveMQ + Kafka Extension cannot connect to Confluent

Hello! Here’s is my setup:
as a POC I set up a cluster of HiveMQ in AWS, enabled the Kafka Extension and was expecting it to send the messages to Confluent Cloud with the API key for the cluster. The extension initializes successfully but it cannot connect to cluster to get the brokers. I tried the quick start python snippet from Confluent to check that the authentication credentials work with the same broker, username and password (API key in my case) - and it successfully writes messages to the specified topic from the machine HiveMQ is running on (so it is not connectivity or security group issue)
Config for the extension:


Here’s a sample of the error log:

2023-04-12 09:24:04,155 WARN  - [AdminClient clientId=adminclient-1] Connection to node -1 (CLUSTER_ID.eu-central-1.aws.confluent.cloud/52.57.115.71:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
2023-04-12 09:24:04,574 DEBUG - Reading file /proc/stat
2023-04-12 09:24:04,574 DEBUG - Reading file /proc/stat
2023-04-12 09:24:04,968 WARN  - [Producer clientId=hivemq-kafka-extension-producer-6h8zjp5cnh] Connection to node -1 (CLUSTER_ID.eu-central-1.aws.confluent.cloud/35.157.17.253:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
2023-04-12 09:24:04,972 WARN  - [Producer clientId=hivemq-kafka-extension-producer-6h8zjp5cnh] Bootstrap broker CLUSTER_ID.eu-central-1.aws.confluent.cloud:9092 (id: -1 rack: null) disconnected
2023-04-12 09:24:05,153 DEBUG - Reading file /proc/uptime
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/uptime
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/io
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/io
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/status
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/status
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/stat
2023-04-12 09:24:05,154 DEBUG - Reading file /proc/2919/stat
2023-04-12 09:24:05,225 DEBUG - Not able to fetch Kafka broker count, reason: Timed out waiting for a node assignment. Call: listNodes
2023-04-12 09:24:05,253 DEBUG - sendText("109431|for...resources" : {}}]",org.atmosphere.container.version.JSR356WebSocket$WriteResult@60d4fa56)
2023-04-12 09:24:05,253 DEBUG - Queuing TEXT[len=109438,fin=true,rsv=...,masked=false]
2023-04-12 09:24:05,253 DEBUG - Processing TEXT[len=109438,fin=true,rsv=...,masked=false]
2023-04-12 09:24:05,253 DEBUG - Queuing TEXT[len=109438,fin=true,rsv=...,masked=false]
2023-04-12 09:24:05,253 DEBUG - Processing TEXT[len=109438,fin=true,rsv=...,masked=false]
2023-04-12 09:24:05,253 DEBUG - Compressing TEXT[len=109438,fin=true,rsv=...,masked=false]: 109438 bytes in 109438 bytes chunk
2023-04-12 09:24:05,253 DEBUG - Supplied 109438 input bytes: Deflater[finished=false,read=12842691,written=581415,in=12842691,out=581415]
2023-04-12 09:24:05,255 DEBUG - Wrote org.eclipse.jetty.io.ByteBufferAccumulator@6e13183c bytes to output buffer
2023-04-12 09:24:05,255 DEBUG - Wrote org.eclipse.jetty.io.ByteBufferAccumulator@6e13183c bytes to output buffer
2023-04-12 09:24:05,255 DEBUG - compressed[] bytes = HeapByteBuffer@242d64c8[p=0,l=3570,c=109568,r=3570]={<<<\xEc\x9d_o\xA3H\x16\xC5\xBf\n\xE2iWrk\r\xE6\x8f\xE9~\x9b\xD6\xF4d5...]\xD8\xDf\x94g\xAe\x9a\xBb\xEb\xFd\xA2:\xB3Lmu\xBa\xFd\x0f\x00\x00\xFf\xFf>>>\xFd\xD7<\xDc\x94\xEb\xFa\xC8R...\x00\x00\x00\x00\x00\x00\x00}
--
2023-04-12 09:24:05,255 DEBUG - Flusher@60f0705c[PROCESSING][queueSize=0,aggregateSize=-1,terminated=null] flushing 1 frames: [FrameEntry[TEXT[len=3566,fin=true,rsv=1..,masked=false],org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension$Flusher$1@6a2c9126,OFF,null]]
2023-04-12 09:24:05,255 DEBUG - write: WriteFlusher@7de16d15{IDLE}->null [DirectByteBuffer@8250c7b[p=0,l=4,c=1024,r=4]={<<<\xC1~\r\xEe>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00},HeapByteBuffer@242d64c8[p=0,l=3566,c=109568,r=3566]={<<<\xEc\x9d_o\xA3H\x16\xC5\xBf\n\xE2iWrk\r\xE6\x8f\xE9~\x9b\xD6\xF4d5...~^\xFe\xB1]\xD8\xDf\x94g\xAe\x9a\xBb\xEb\xFd\xA2:\xB3Lmu\xBa\xFd\x0f>>>\x00\x00\xFf\xFf\xFd\xD7<\xDc\x94...\x00\x00\x00\x00\x00\x00\x00}]
2023-04-12 09:24:05,255 DEBUG - update WriteFlusher@7de16d15{WRITING}->null:IDLE-->WRITING
2023-04-12 09:24:05,255 DEBUG - flushed 3570 SocketChannelEndPoint@2b92830d{l=/10.1.11.247:8080,r=/10.1.13.19:37868,OPEN,fill=FI,flush=W,to=5009/300000}{io=1/1,kio=1,kro=1}->WebSocketServerConnection@7f40d6b5[s=ConnectionState@773b69ab[OPENED],f=Flusher@60f0705c[PROCESSING][queueSize=0,aggregateSize=-1,terminated=null],g=Generator[SERVER,validating,+rsv1],p=Parser@7b096f4f[ExtensionStack,s=START,c=0,len=204,f=null]]
2023-04-12 09:24:05,255 DEBUG - Flushed=true written=3570 remaining=0 WriteFlusher@7de16d15{WRITING}->null
2023-04-12 09:24:05,255 DEBUG - update WriteFlusher@7de16d15{IDLE}->null:WRITING-->IDLE

Could you please help me understand what’s wrong with the configuration?
Thank you!

Hi! Whenever I do sudo rm /opt/hivemq-4.13.0/extensions/hivemq-kafka-extension/DISABLED I start seeing the following errors in hivemq log right away:

2023-04-17 13:19:57,848 INFO  - No license found for enterprise extension with name "HiveMQ Enterprise Extension for Kafka". Entering extension trial mode. "HiveMQ Enterprise Extension for Kafka" will be disabled in 5 hours.
2023-04-17 13:19:57,921 INFO  - Starting extension with id "hivemq-kafka-extension" at /opt/hivemq/extensions/hivemq-kafka-extension
2023-04-17 13:19:58,518 INFO  - mappingStyle: FIELD
2023-04-17 13:19:58,522 INFO  - loading GuavaAddOns ...
2023-04-17 13:19:58,559 INFO  - loading JodaAddOns ...
2023-04-17 13:19:58,634 INFO  - using fake InMemoryRepository, register actual Repository implementation via JaversBuilder.registerJaversRepository()
2023-04-17 13:19:58,665 INFO  - JaVers instance started in 212 ms
2023-04-17 13:19:58,666 INFO  - mappingStyle: FIELD
2023-04-17 13:19:58,666 INFO  - loading JodaAddOns ...
2023-04-17 13:19:58,670 INFO  - loading GuavaAddOns ...
2023-04-17 13:19:58,674 INFO  - using fake InMemoryRepository, register actual Repository implementation via JaversBuilder.registerJaversRepository()
2023-04-17 13:19:58,679 INFO  - JaVers instance started in 14 ms
2023-04-17 13:19:59,618 WARN  - [AdminClient clientId=adminclient-1] Connection to node -1 (pkc-zpjg0.eu-central-1.aws.confluent.cloud/3.74.146.0:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
2023-04-17 13:19:59,722 WARN  - [AdminClient clientId=adminclient-1] Connection to node -1 (pkc-zpjg0.eu-central-1.aws.confluent.cloud/35.157.17.253:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

Confluent uses SSL by default so this must be enabled in the <kafka-clusters> section:

example:

    <kafka-clusters>
        <kafka-cluster>
            <id>cluster01</id>
            <bootstrap-servers>pkc-1...j.westeurope.azure.confluent.cloud:9092</bootstrap-servers>
            <tls>
                <enabled>true</enabled>
                <hostname-verification>false</hostname-verification>
            </tls>
            <authentication>
                <plain>
                    <username>MS.....FSNT</username>
                    <password>wRRLftO5j......hpIlMQom</password>
                </plain>
        </authentication>
        </kafka-cluster>
    </kafka-clusters>

Also be sure to create a API key for the resources (eg cluster ID)
confluent api-key create --resource lkc-...dq
and use the generated API key and API secret as respective username and password.

1 Like

Thank you, this worked