Hi @psmedley,
welcome aboard. Exploring MQTT and HiveMQ can be a fun journey, and we’re here to help you along the way.
Your suspicion is very likely correct. The issue you’re describing is a classic symptom of an unhandled exception occurring within an asynchronous callback.
When you subscribe to a topic using the HiveMQ client, you provide a callback method (this::handleMessage
). The client’s internal threads invoke this method whenever a message arrives. If your handleMessage
method (or any method it calls) throws an exception that isn’t caught, the exception propagates up to the HiveMQ client’s executor thread. To prevent one faulty callback from crashing the entire client, the library often handles this by logging the error (sometimes only at a low level) and then ceasing to deliver further messages to that specific callback.
This explains why you can still publish: the publishing mechanism is separate and unaffected. The connection is still alive, but the subscription’s message delivery pipeline has been silently shut down due to the error.
Let’s look at your handleMessage
method. The most probable point of failure is when you delegate the processing to the child handler.
// RoborockAccountHandler.java
public void handleMessage(@Nullable Mqtt5Publish publish) {
if (publish == null) {
// ...
return;
}
// ...
for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
if (entry.getKey().getUID().getAsString().contains(destination)) {
// ...
byte[] payload = publish.getPayloadAsBytes();
// VULNERABLE CALL: An exception here will kill the callback.
entry.getValue().handleMessage(payload);
return;
}
}
}
Any RuntimeException
(like a NullPointerException
, ArrayIndexOutOfBoundsException
, or a JSON parsing error) inside the RoborockVacuumHandler.handleMessage(payload)
method will cause the behavior you’re seeing. The error could be triggered by an unexpected message format from the Roborock servers that appears only intermittently.
You must wrap the call to the child handler in a try-catch
block to prevent any exceptions from escaping your callback method. This ensures that even if one message fails to process, the client will continue to deliver subsequent messages.
Here is the corrected version of your handleMessage
method:
// RoborockAccountHandler.java
public void handleMessage(@Nullable Mqtt5Publish publish) {
if (publish == null) {
logger.debug("handleMessage - null publish received");
return;
}
String receivedTopic = publish.getTopic().toString();
String destination = receivedTopic.substring(receivedTopic.lastIndexOf('/') + 1);
logger.debug("Received MQTT message for device {}", destination);
lastMQTTMessageTimestamp = System.currentTimeMillis();
for (Entry<Thing, RoborockVacuumHandler> entry : childDevices.entrySet()) {
if (entry.getKey().getUID().getAsString().contains(destination)) {
try {
logger.trace("Submit response to child {} -> {}", destination, entry.getKey().getUID());
byte[] payload = publish.getPayloadAsBytes();
// It's also good practice to check if the payload is not null or empty
if (payload != null && payload.length > 0) {
entry.getValue().handleMessage(payload);
} else {
logger.debug("Received message for {} with empty payload.", destination);
}
} catch (Exception e) {
// LOG THE ERROR! This is critical for debugging.
logger.error("Unhandled exception processing MQTT message for device {}. Message will be discarded.", destination, e);
}
return; // Exit after finding the correct handler.
}
}
// Optional: Log if no handler was found for the message
logger.warn("Received MQTT message for unknown device destination: {}", destination);
}
By adding this try-catch (Exception e)
block, you’ll “swallow” any errors from the downstream handleMessage
call, log them for your own diagnostics, and allow the HiveMQ client to continue its work without interruption.
Best,
Dasha from The HiveMQ Team