Client stops receiving messages

Hi,

I’m using the hivemq java client in an OpenHAB binding to talk to the Roborock MQTT servers.

I am able to connect, subcribe to the server, and publish commands.

After a seemingly random period of time, mqtt messages stop being received/handled by my code.

The example MQTT connection and handling code is at openhab-addons/bundles/org.openhab.binding.roborock/src/main/java/org/openhab/binding/roborock/internal/RoborockAccountHandler.java at roborock · psmedley/openhab-addons · GitHub

Sending commands still succeeds - so it seems the mqtt connection is still in place, just the received messages aren’t passed to the callback???

Edit: I wonder if it could be this: What happens if exceptions are thrown in callbacks? · Issue #575 · hivemq/hivemq-mqtt-client · GitHub

Any thoughts appreciated!

Cheers,

Paul

Hi @psmedley,
welcome aboard. Exploring MQTT and HiveMQ can be a fun journey, and we’re here to help you along the way.

Your suspicion is very likely correct. The issue you’re describing is a classic symptom of an unhandled exception occurring within an asynchronous callback.

When you subscribe to a topic using the HiveMQ client, you provide a callback method (this::handleMessage). The client’s internal threads invoke this method whenever a message arrives. If your handleMessage method (or any method it calls) throws an exception that isn’t caught, the exception propagates up to the HiveMQ client’s executor thread. To prevent one faulty callback from crashing the entire client, the library often handles this by logging the error (sometimes only at a low level) and then ceasing to deliver further messages to that specific callback.

This explains why you can still publish: the publishing mechanism is separate and unaffected. The connection is still alive, but the subscription’s message delivery pipeline has been silently shut down due to the error.

Let’s look at your handleMessage method. The most probable point of failure is when you delegate the processing to the child handler.

// RoborockAccountHandler.java

public void handleMessage(@Nullable Mqtt5Publish publish) {
    if (publish == null) {
        // ...
        return;
    }
    // ...
    for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
        if (entry.getKey().getUID().getAsString().contains(destination)) {
            // ...
            byte[] payload = publish.getPayloadAsBytes();

            // VULNERABLE CALL: An exception here will kill the callback.
            entry.getValue().handleMessage(payload);

            return;
        }
    }
}

Any RuntimeException (like a NullPointerException, ArrayIndexOutOfBoundsException, or a JSON parsing error) inside the RoborockVacuumHandler.handleMessage(payload) method will cause the behavior you’re seeing. The error could be triggered by an unexpected message format from the Roborock servers that appears only intermittently.

You must wrap the call to the child handler in a try-catch block to prevent any exceptions from escaping your callback method. This ensures that even if one message fails to process, the client will continue to deliver subsequent messages.

Here is the corrected version of your handleMessage method:

// RoborockAccountHandler.java

public void handleMessage(@Nullable Mqtt5Publish publish) {
    if (publish == null) {
        logger.debug("handleMessage - null publish received");
        return;
    }

    String receivedTopic = publish.getTopic().toString();
    String destination = receivedTopic.substring(receivedTopic.lastIndexOf('/') + 1);
    logger.debug("Received MQTT message for device {}", destination);
    lastMQTTMessageTimestamp = System.currentTimeMillis();

    for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
        if (entry.getKey().getUID().getAsString().contains(destination)) {
            try {
                logger.trace("Submit response to child {} -> {}", destination, entry.getKey().getUID());
                byte[] payload = publish.getPayloadAsBytes();

                // It's also good practice to check if the payload is not null or empty
                if (payload != null && payload.length > 0) {
                    entry.getValue().handleMessage(payload);
                } else {
                    logger.debug("Received message for {} with empty payload.", destination);
                }

            } catch (Exception e) {
                // LOG THE ERROR! This is critical for debugging.
                logger.error("Unhandled exception processing MQTT message for device {}. Message will be discarded.", destination, e);
            }
            return; // Exit after finding the correct handler.
        }
    }
    // Optional: Log if no handler was found for the message
    logger.warn("Received MQTT message for unknown device destination: {}", destination);
}

By adding this try-catch (Exception e) block, you’ll “swallow” any errors from the downstream handleMessage call, log them for your own diagnostics, and allow the HiveMQ client to continue its work without interruption.

Best,
Dasha from The HiveMQ Team

Thank you! Testing this now, and will report back!

Unfortunately, it doesn’t seem to have helped :frowning: Messages still stopped being processed after a period of time :frowning:

To satisfy my curiosity, I’ve replaced the handler function with:

        client.subscribeWith().topicFilter(topic).callback(publish -> {
            logger.info("Received MQTT Message");
        }).send();

to see if it keeps receiving messages. This can’t produce an exception :slight_smile:

Unfortunately, after an hour or so, this also stopped producing log messages, so appears the callback isn’t causing the problem.

Is it possible that the client disconnect/reconnects and the subcription is lost?
Should I be adding .addConnectedListener() to handle the subscribe operation to ensure the subscription is maintained?

Edit: FWIW I tried this, and seems the connection isn’t being dropped, as addConnectedListener is only triggered once.

Hi @psmedley

You are correct in your assessment, as brokers may not always maintain subscription state across connection interruptions. This is a common scenario in distributed messaging systems where network instability or broker restarts can occur.

Your approach of using .addConnectedListener() to handle re-subscription logic is the recommended best practice.

We suggest implementing the re-subscription logic to maintain subscription continuity. This pattern provides resilience against various failure scenarios including broker failovers.

To provide more specific guidance, could you please share which MQTT broker you are using and its version? Additionally, if you are using HiveMQ, please specify the version as broker behavior can vary between different implementations and versions.

For a more comprehensive review and specific implementation guidance, please share your clients code or ideally the complete project structure. This will allow us to provide more targeted recommendations based on your particular setup and requirements.

Best,

Dasha from The HiveMQ Team

Full code is at openhab-addons/bundles/org.openhab.binding.roborock at roborock · psmedley/openhab-addons · GitHub but all the mqtt handling code is in openhab-addons/bundles/org.openhab.binding.roborock/src/main/java/org/openhab/binding/roborock/internal/RoborockAccountHandler.java at roborock · psmedley/openhab-addons · GitHub

I’m not sure what software the mqtt server is running - as it is provided by roborock.

However, mqtt-us.roborock.com resolves to mqtt-slb-1st-1913472363.us-east-1.elb.amazonaws.com

FWIW I’ve also been playing around with the paho mqtt client.

Some learnings that I need to apply to my hivemq implementation:

  1. I needed to use setCleanSession(false) so that it didn’t use a clean session when restarting.
  2. I had to publish messages with qos(1) otherwise it seemed I didn’t consistently get a response

With Modify connection parameters to re-use sessions, and publissh with qos=1 · psmedley/openhab-addons@49484c1 · GitHub a connection has been running for > 6 hours with no issues :slight_smile:

@psmedley Neither QoS nor the session clean start parameter affects the duration of the connection between the MQTT client and the broker. The outcome you observed was likely coincidental.

Perhaps poor choice of words on my part. I believe the session clean start parameter helped ensure that when the connection did drop, that existing subscriptions were honoured and messaged continued to be received.