HiveMQ

How to use the correlationData - messages never arrive

I want to send a message which contains correlationData so we can match the request/replies.

I’m using the Java HiveMQ Maven library with following code to send a message:

client.publishWith()
                .topic(destinationTopic)
                .responseTopic(responseTopic)
                .correlationData(correlationData.getBytes())
                .payload(message.getBytes())
                .qos(MqttQos.AT_LEAST_ONCE)
                .send()
                .whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        logger.info("Error sending to '{}': {} - {},", destinationTopic, message, throwable.getMessage());
                    } else {
                        logger.info("Message sent to '{}': {} - {}", destinationTopic, message, result);
                    }
                });

When commenting the line with .correlationData(correlationData.getBytes()) the message appears on the test page MQTT Websocket Client and at my subscribers.

When using that line, the message is successfully sent as shown by the logger in ‘whenComplete’ but never appears in the websocket-client, and doesn’t arrive at any subscriber.

So I guess I’m using the correlationData in the wrong way? Or did I just discover a bug? :wink:

Using the paho mqtt5 library behaves exactly the same…

with correlation data = no message appears in websocket client
without correlation data = message appears

        MqttProperties properties = new MqttProperties();
        //properties.setCorrelationData(correlationData.getBytes());
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(1);
        mqttMessage.setPayload(message.getBytes());
        mqttMessage.setProperties(properties);
        try {
            mqttConfig.getClient().publish(destinationTopic, mqttMessage);
            logger.info("Message was sent to '{}': {}", destinationTopic, message);
        } catch (MqttException ex) {
            logger.error("Error sending to '{}': {} - {}", destinationTopic, message, ex.getMessage());
        }

Unfortunately the example code for this post gives a 404: Request - Response Pattern - MQTT 5 Essentials Part 9

Hi @frank,

Have you checked out this example? hivemq-mqtt-client/RequestResponse.java at master · hivemq/hivemq-mqtt-client · GitHub

Regards,
Florian

Thanks Florian for the link, but I don’t really understand this example. It uses two simultaneous connections to HiveMQ Cloud? Anyhow this code, which looks now very similar to the sending part of that example has the same issue, need to disable the line with correlation data to see the message arriving…

mqttConfig.getClient().toAsync()
                .subscribeWith()
                .topicFilter(responseTopic)
                .callback(responsePublish -> System.out.println("received response"))
                .send()
                .thenCompose(subAck -> mqttConfig.getClient().toAsync()
                        .publishWith()
                        .topic(destinationTopic)
                        .responseTopic(responseTopic == null || responseTopic.isEmpty() ? null : responseTopic)
                        //.correlationData(correlationData == null || correlationData.isEmpty() ? null : correlationData.getBytes())
                        .qos(MqttQos.EXACTLY_ONCE)
                        .payload(message.getBytes())
                        .send()
                        .whenComplete((result, throwable) -> {
                            if (throwable != null) {
                                logger.info("Error sending to '{}': {} - {}", destinationTopic, message, throwable.getMessage());
                            } else {
                                logger.info("Message sent to '{}': {} - {}", destinationTopic, message, result);
                            }
                        }));

BTW this is my client connection code

    private static Mqtt5AsyncClient client;

    public MqttConfig() {
        client = MqttClient.builder()
                .useMqttVersion5()
                .identifier(UUID.randomUUID().toString())
                .serverHost(MQTT_SERVER)
                .serverPort(MQTT_PORT)
                .sslWithDefaultConfig()
                .buildAsync();

        client.connectWith()
                .simpleAuth()
                .username(MQTT_USER)
                .password(MQTT_PASSWORD.getBytes())
                .applySimpleAuth()
                .send()
                .whenComplete((connAck, throwable) -> {
                    if (throwable != null) {
                        logger.error("Could not connect to MQTT: {}", throwable.getMessage());
                    } else {
                        logger.info("Connected to MQTT: {}", connAck.getReasonCode());
                    }
                });
    }

Tried the same code and works OK with correlationData when pushing/subscribing to a local HiveMQ instance in a Docker (without SSL) which was started with

docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4

Also works OK with a Mosquitto broker on stackhero.io.

So that would mean that there is a problem with HiveMQ Cloud when using correlationData…

Hi @frank,

Thank you for your patience. We were able to verify this is something misbehaving on our cloud service.
Our devs are already on the case.
Will let you know as soon as we have a solution.

Regards,
Florian

1 Like

Hi @frank,

Delighted to let you know the issue has been fixed and deployed to HiveMQ Cloud!

Kind regards,
Florian

Confirmed! Data with correlationData is now also received on the websocket test page. Thanks for the fix!

Do I get a bug-bounty now? :wink: