Mqtt CLI persistant session

Hi all,

I tried to start a persistent session using MQTT cli but couldn’t get it to work. The idea was to create a session that is able to queue a couple of messages and to receive them after my client connects again.
Here is what I tried:
mqtt sub -t testtopic -q 1 --no-cleanStart -i testclient -h 127.0.0.1

–no-cleanStart is the only flag I found that looks remotely like something to use to create a persistent session. But after closing the connection, the broker deletes the client out of the client list and does not queue messages.
But maybe I overlooked something.
Thanks in advance.
Best regards,
Zuendi

Hi @Zuendi

You need to add Session Expiry. For example, to use Session Expiry 1h:

I hope it helps

Regards,
Dasha from HiveMQ Team

1 Like

Hi Dasha,

that helps allot, thank you for this very fast response.
I tried it out and it looks perfekt on the brokers side. The session stays open and messages are being queued.
However after starting the client via CLI again, I get my first message and then an Error. Ideas?

Blockquote
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: What's different in 2.0 · ReactiveX/RxJava Wiki · GitHub | java.lang.UnsupportedOperationException: A publish must not be acknowledged if manual acknowledgement is not enabled
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: A publish must not be acknowledged if manual acknowledgement is not enabled
at com.hivemq.client.internal.mqtt.message.publish.MqttPublish.acknowledge(MqttPublish.java:170)
at com.hivemq.cli.mqtt.SubscribeMqtt5PublishCallback.accept(SubscribeMqtt5PublishCallback.java:85)
at com.hivemq.cli.mqtt.SubscribeMqtt5PublishCallback.accept(SubscribeMqtt5PublishCallback.java:33)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:303)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:288)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
… 6 more
Exception in thread “RxComputationThreadPool-1” io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: What's different in 2.0 · ReactiveX/RxJava Wiki · GitHub | java.lang.UnsupportedOperationException: A publish must not be acknowledged if manual acknowledgement is not enabled
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: A publish must not be acknowledged if manual acknowledgement is not enabled
at com.hivemq.client.internal.mqtt.message.publish.MqttPublish.acknowledge(MqttPublish.java:170)
at com.hivemq.cli.mqtt.SubscribeMqtt5PublishCallback.accept(SubscribeMqtt5PublishCallback.java:85)
at com.hivemq.cli.mqtt.SubscribeMqtt5PublishCallback.accept(SubscribeMqtt5PublishCallback.java:33)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:303)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:288)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)

Hi

In order to assist you more effectively, could you please provide the specific command you are running when encountering the error? This additional information will greatly help us in diagnosing and resolving the issue promptly.

Shure, this is what I did:
I have 2 clients. The first sends data like this:

mqtt pub -t sendertopic -m ‘test3’ -q 1

The second client subscribes to the topic:

mqtt sub -t sendertopic -q 1 --no-cleanStart -i testclient -h 127.0.0.1 --sessionExpiryInterval 3600

After sending data from client 1, client 2 gets the data. Like a charm. Now I kill client 2, keep sending data and reconecct client 2 with the same command. Then the error happens. The client cannot consume the queued messages.

How do you kill the client? Can you share a video?

I kill the client via ^C. Have a look:
MQTT_Cli_Error

Hi @Zuendi,

Thank you for sharing your video!

I see that the client is currently receiving test3 and test4, but unfortunately, an exception occurs thereafter. It appears that this might be a client-related issue rather than a broker one.

As a workaround, I recommend trying an alternative client such as mosquitto_sub:

mosquitto_sub -t '#' -c -i CLI -q 1

This might provide a workaround while we investigate the underlying client bug.

If you have any further questions or need assistance, feel free to reach out.

Best regards,
Dasha from HiveMQ Team

Hello Daria,

yes I agree, it should be a problem with the client, or in this case with the mqtt CLI. That’s exactly what I was concerned about. I don’t want to test another CLI, I want to improve the mqtt CLI with my hint. It is developed by HiveMQ, correct? That’s why I wanted to share my finding.

Best regards
Zuendi.

@Zuendi

I appreciate your initiative to enhance MQTT-CLI; it’s a commendable direction to take. To maximize its impact, I recommend opening an issue on the https://github.com/hivemq/hivemq-community-edition repository. Ensure the issue is well-presented and informative to avoid any oversight due to formal reasons.

Thank you,
Dasha from HiveMQ Team

1 Like

Hi Dasha,

I did this three weeks ago but do not get any answer:

Best regards
Zuendi.

@Daria_H do you have any other idea what I can try?

Hi @Zuendi

Certainly! Try using mosquitto_sub command instead of mqtt-cli sub.

Here’s a breakdown of the mosquitto_sub command and its arguments:

mosquitto_sub -t '#' -q 1 -x 600 -c --id SUB --host localhost --port 1884
  • mosquitto_sub: This is the command-line utility for subscribing to MQTT topics.
  • -t '#': Subscribe to topics using the wildcard #, which means subscribing to all topics.
  • -q 1: Set the Quality of Service (QoS) level to 1. QoS defines the message delivery guarantee.
  • -x 600: Set the session expiry interval to 600 seconds. This determines how long the broker should maintain information about the client after it disconnects.
  • -c: Enable a persistent session (non-clean session). This means that the broker will remember the client’s subscriptions and queued messages even if the client disconnects.
  • --id SUB: Specify the client identifier as “SUB”. This identifier is used to distinguish different MQTT clients.
  • --host localhost: Specify the MQTT broker’s host. This is the address of the MQTT broker to which the client is connecting.
  • --port 1884: Specify the MQTT broker’s port. This is the port number on which the MQTT broker is listening for incoming connections.

The overall command is used to subscribe to all topics (#) with QoS 1, set a session expiry interval of 600 seconds, use a persistent (non-clean) session, and connect to the MQTT broker with the specified client identifier, host, and port.

I hope it helps.

Best regards,
Dasha

Hi @Daria_H,

you already told me that the mosquitto cli could help. But I wanted to share a problem with your mqtt cli.

Hi @Zuendi,

Thank you for bringing this issue to our attention.

Our HiveMQ engineers are on it and will investigate the problem as soon as they can. We’re committed to addressing this issue, and we aim to include the fix in one of our upcoming releases.

In the meantime, you can try using an alternative MQTT client or revert to an older version of mqtt-cli – version 4.20.0 should work for you.

If you have any more questions or need further assistance, feel free to reach out. We appreciate your patience and understanding.

Best regards,
Dasha

1 Like