Client stops receiving messages

Hi,

I’m using the hivemq java client in an OpenHAB binding to talk to the Roborock MQTT servers.

I am able to connect, subcribe to the server, and publish commands.

After a seemingly random period of time, mqtt messages stop being received/handled by my code.

The example MQTT connection and handling code is at openhab-addons/bundles/org.openhab.binding.roborock/src/main/java/org/openhab/binding/roborock/internal/RoborockAccountHandler.java at roborock · psmedley/openhab-addons · GitHub

Sending commands still succeeds - so it seems the mqtt connection is still in place, just the received messages aren’t passed to the callback???

Edit: I wonder if it could be this: What happens if exceptions are thrown in callbacks? · Issue #575 · hivemq/hivemq-mqtt-client · GitHub

Any thoughts appreciated!

Cheers,

Paul

Hi @psmedley,
welcome aboard. Exploring MQTT and HiveMQ can be a fun journey, and we’re here to help you along the way.

Your suspicion is very likely correct. The issue you’re describing is a classic symptom of an unhandled exception occurring within an asynchronous callback.

When you subscribe to a topic using the HiveMQ client, you provide a callback method (this::handleMessage). The client’s internal threads invoke this method whenever a message arrives. If your handleMessage method (or any method it calls) throws an exception that isn’t caught, the exception propagates up to the HiveMQ client’s executor thread. To prevent one faulty callback from crashing the entire client, the library often handles this by logging the error (sometimes only at a low level) and then ceasing to deliver further messages to that specific callback.

This explains why you can still publish: the publishing mechanism is separate and unaffected. The connection is still alive, but the subscription’s message delivery pipeline has been silently shut down due to the error.

Let’s look at your handleMessage method. The most probable point of failure is when you delegate the processing to the child handler.

// RoborockAccountHandler.java

public void handleMessage(@Nullable Mqtt5Publish publish) {
    if (publish == null) {
        // ...
        return;
    }
    // ...
    for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
        if (entry.getKey().getUID().getAsString().contains(destination)) {
            // ...
            byte[] payload = publish.getPayloadAsBytes();

            // VULNERABLE CALL: An exception here will kill the callback.
            entry.getValue().handleMessage(payload);

            return;
        }
    }
}

Any RuntimeException (like a NullPointerException, ArrayIndexOutOfBoundsException, or a JSON parsing error) inside the RoborockVacuumHandler.handleMessage(payload) method will cause the behavior you’re seeing. The error could be triggered by an unexpected message format from the Roborock servers that appears only intermittently.

You must wrap the call to the child handler in a try-catch block to prevent any exceptions from escaping your callback method. This ensures that even if one message fails to process, the client will continue to deliver subsequent messages.

Here is the corrected version of your handleMessage method:

// RoborockAccountHandler.java

public void handleMessage(@Nullable Mqtt5Publish publish) {
    if (publish == null) {
        logger.debug("handleMessage - null publish received");
        return;
    }

    String receivedTopic = publish.getTopic().toString();
    String destination = receivedTopic.substring(receivedTopic.lastIndexOf('/') + 1);
    logger.debug("Received MQTT message for device {}", destination);
    lastMQTTMessageTimestamp = System.currentTimeMillis();

    for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
        if (entry.getKey().getUID().getAsString().contains(destination)) {
            try {
                logger.trace("Submit response to child {} -> {}", destination, entry.getKey().getUID());
                byte[] payload = publish.getPayloadAsBytes();

                // It's also good practice to check if the payload is not null or empty
                if (payload != null && payload.length > 0) {
                    entry.getValue().handleMessage(payload);
                } else {
                    logger.debug("Received message for {} with empty payload.", destination);
                }

            } catch (Exception e) {
                // LOG THE ERROR! This is critical for debugging.
                logger.error("Unhandled exception processing MQTT message for device {}. Message will be discarded.", destination, e);
            }
            return; // Exit after finding the correct handler.
        }
    }
    // Optional: Log if no handler was found for the message
    logger.warn("Received MQTT message for unknown device destination: {}", destination);
}

By adding this try-catch (Exception e) block, you’ll “swallow” any errors from the downstream handleMessage call, log them for your own diagnostics, and allow the HiveMQ client to continue its work without interruption.

Best,
Dasha from The HiveMQ Team

Thank you! Testing this now, and will report back!

Unfortunately, it doesn’t seem to have helped :frowning: Messages still stopped being processed after a period of time :frowning:

To satisfy my curiosity, I’ve replaced the handler function with:

        client.subscribeWith().topicFilter(topic).callback(publish -> {
            logger.info("Received MQTT Message");
        }).send();

to see if it keeps receiving messages. This can’t produce an exception :slight_smile: