Inconsistent Message Distribution with Shared Subscriptions in HiveMQ CE on Kubernetes

I’m using HiveMQ CE (latest version) on Kubernetes and attempting to implement the shared subscription feature as per HiveMQ documentation with the paho.mqtt.golang library.
When scaling my service, I observe irregular message distribution. Instead of an expected round-robin distribution, only a fraction of my services receive a significant amount of data (~2-3k msg/s), while others barely get messages (~0-10 msg/s). For instance, only 2 out of 3 or 3 out of 5 or 3 out of 7 services receive the bulk of messages.

Any insights would be appreciated.

Regards
Ferdinand

Hello @FLinnenberg ,

First off, welcome to the HiveMQ Community Forum! We are always happy to see new users.

The first thing I would like to note is that message distribution is typically designed in such a way as to minimize traffic, and avoid overloading a subscribing client. This typically results in a scenario in which clients that consume messages faster being prioritized for delivery.

As you’ve mentioned this being a HiveMQ Community Edition broker, this should not apply, but it is worth mentioning that if deployed in a clustered environment with multiple HiveMQ broker nodes, there is additional consideration given to reduce intra-cluster traffic which can impact delivery rates to subscribing clients, as well.

WIth that in mind, in order to review further, I’d like to ask for some additional details from your deployment - namely, your broker configuration file, the structure for your shared subscription topic, how these clients are subscribed to this topic, and additional client details, such as their MQTT version, persistence configuration, or any other details you can share. Please bear in mind that information such as usernames, passwords, addresses, or other pieces of information can be obfuscated or removed as necessary.

Best,
Aaron from the HiveMQ Team

Hi Aaron,

we use HiveMQ (2023.7) inside Kubernetes, using the following configurations:

config.xml:

<?xml version="1.0"?>
<hivemq
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<listeners>
		<tcp-listener>
			<port>1883</port>
			<bind-address>0.0.0.0</bind-address>
			<name>hivemq-mqtt</name>
		</tcp-listener>
		<tls-tcp-listener>
			<port>8883</port>
			<bind-address>0.0.0.0</bind-address>
			<name>hivemq-secure-mqtt</name>
			<tls>
				<protocols>
					<protocol>TLSv1.3</protocol>
					<protocol>TLSv1.2</protocol>
				</protocols>
				<keystore>
					<path>/stores/keystore.jks</path>
					<password>redacted</password>
					<private-key-password>redacted</private-key-password>
				</keystore>
				<truststore>
					<path>/stores/truststore.jks</path>
					<password>redacted</password>
				</truststore>
				<client-authentication-mode>REQUIRED</client-authentication-mode>
				<cipher-suites>
					<cipher-suite>TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384</cipher-suite>
					<cipher-suite>TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256</cipher-suite>
					<cipher-suite>TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256</cipher-suite>
					<cipher-suite>TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA</cipher-suite>
					<cipher-suite>TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA</cipher-suite>
					<cipher-suite>TLS_RSA_WITH_AES_128_GCM_SHA256</cipher-suite>
					<cipher-suite>TLS_RSA_WITH_AES_128_CBC_SHA</cipher-suite>
					<cipher-suite>TLS_RSA_WITH_AES_256_CBC_SHA</cipher-suite>
				</cipher-suites>
				<prefer-server-cipher-suites>true</prefer-server-cipher-suites>
			</tls>
		</tls-tcp-listener>
	</listeners>
	<persistence>
		<client-session>
			<general>
				<mode>file</mode>
			</general>
			<queued-messages>
				<max-queued-messages>1800000</max-queued-messages>
				<queued-messages-strategy>discard-oldest</queued-messages-strategy>
				<mode>file</mode>
			</queued-messages>
			<subscriptions>
				<mode>file</mode>
			</subscriptions>
		</client-session>
		<message-flow>
			<incoming>
				<mode>file</mode>
			</incoming>
			<outgoing>
				<mode>file</mode>
			</outgoing>
		</message-flow>
		<retained-messages>
			<mode>file</mode>
		</retained-messages>
		<publish-payloads>
			<mode>file</mode>
		</publish-payloads>
		<attribute>
			<mode>file</mode>
		</attribute>
		<client-group>
			<mode>file</mode>
		</client-group>
	</persistence>
	<security>
		<allow-empty-client-id>
			<enabled>false</enabled>
		</allow-empty-client-id>
		<utf8-validation>
			<enabled>true</enabled>
		</utf8-validation>
	</security>
	<mqtt-addons>
		<dead-messages-topic>
			<enabled>true</enabled>
		</dead-messages-topic>
		<expired-messages-topic>
			<enabled>true</enabled>
		</expired-messages-topic>
		<dropped-messages-topic>
			<enabled>true</enabled>
		</dropped-messages-topic>
	</mqtt-addons>
</hivemq>

ENV vars:

HIVEMQ_ALLOW_ALL_CLIENTS: true
JAVA_OPTS : -Xmx750m -Xms750m

We alo use the following extensions for this test:

  • hivemq-allow-all-extension
  • hivemq-prometheus-extension
  • hivemq-heartbeat-extension

Our topic looks like is based on our new datamodel (B) Data Modeling in the Unified Namespace: From Topic Hierachies over Payload Schemas to MQTT/Kafka)
And therefore follows this schema: umh/v1/enterprise/site/area/productionLine/workCell/originID/_usecase/tag

Our clients subscribe using:
$share/DATA_BRIDGE_<GROUP_ID>/<TOPIC>

Where GROUP_ID is generated by SHA256 of the instance serial number.
So that when we scale up our replicas on a single instance, the GROUP_ID is always the same.

The sourcecode of one of our services can be found at: https://github.com/united-manufacturing-hub/united-manufacturing-hub/blob/cd3f5f2e7b2446992718b81fdce5c0db48f2bf9e/golang/cmd/data-bridge/mqtt.go it uses paho.mqtt.golang as it’s MQTT lib.
The libary only supports MQTT 3/3.1

Please let me know if you need any more details.

Regards
Ferdinand