How to Create a Java Client Persistent Subscription

I have the following client that subscribes to a topic and I would like to create a Persistent subscription, but I could not find the suitable builder in the Java client version 1.3.3

  def subscribe(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
    val client = asyncMqttClient(mqttCfg, isPersistentConn = true, id = topic)
    // Define a custom wrapper type to represent the result of the publish operation
    sealed trait PublishResult
    case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
    case class FailedPublish(error: Throwable) extends PublishResult
    client.toAsync
      .subscribeWith()
        .topicFilter(topic)
        .qos(mqttQos) //.cleanStart(false)
        .callback(publish => println(publish.getTopic)) // TODO: proper callback
      .send().asScala.map(_ => ())
  }

It says cleanStart(…) is not available. The documentation is also not that useful. How can I create a Persistent subscription ?

Any ideas anyone? I have been blocked by this since the last 2 days! Would be glad if someone could throw some light on this!

Hi @Joesan,

Thank you very much for expressing your interest in MQTT and the HiveMQ broker. We’re genuinely excited to welcome you to the HiveMQ Community!

Regarding your query about creating a persistent session for the MQTT client, allow me to provide some guidance. To establish a persistent session, the client needs to connect with cleanStart=false and sessionExpiryInterval>0. Here’s an example snippet illustrating this:

        client.connectWith()
                .simpleAuth()
                .username(username)
                .password(UTF_8.encode(password))
                .applySimpleAuth()
                .cleanStart(false)
                .sessionExpiryInterval(3600)
                .send();

Please keep in mind that only publish packets with QoS 1 and QoS 2 are persisted. Therefore, it’s essential to ensure that your client is using either QoS 1 or QoS 2 when subscribing or publishing.

I trust this information proves helpful to you. If you have any further questions or need additional assistance, please don’t hesitate to reach out.

Warm regards,
Dasha from HiveMQ Team

I appreciate that you are trying to help me, but as I pointed out in my question, I would need to use the async API and in the async API, I do not see the cleanStart method. So how do I get a cleanStart using the async API? Here is how I’m using the builder:

 def asyncMqttClient(mqttCfg: MqttConfig, isPersistentConn: Boolean, id: String): Mqtt5AsyncClient = {
    Mqtt5Client.builder()
      .identifier(s"${mqttCfg.appName}-$id")
      .serverHost(mqttCfg.serverHost)
      .serverPort(mqttCfg.serverPort)
      .automaticReconnectWithDefaultConfig()
      .transportConfig()
        .mqttConnectTimeout(5, TimeUnit.SECONDS) // TODO: Get this value from the config
        .socketConnectTimeout(5,TimeUnit.SECONDS) // TODO: Get this value from the config
      .applyTransportConfig()

      //.simpleAuth()
        //.username(mqttCfg.user.getOrElse(""))
        //.password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
      //.applySimpleAuth()
      .buildAsync()
  }

Is this how I do it?

 client.connectWith()
      .cleanStart(false)
      .sessionExpiryInterval(3600)
      .send();
    
    client.toAsync
      .subscribeWith()
        .topicFilter(topic)
        .qos(mqttQos)
        .callback(publish => sink ! publish.getPayloadAsBytes.toString) // TODO: proper callback
      .send().asScala.map(_ => ())

I first connect with cleanStart and then I turn that client toAsync and subscribe with the callback method like above?

Hi Joesan,

that works. But you can also build first async and then connect:

    // build async client
    val mqtt5AsyncClient = Mqtt5Client.builder()
        .identifier("my-id")
        .serverHost("broker.hivemq.com")
        .serverPort(1883)
        .automaticReconnectWithDefaultConfig()
        .transportConfig()
        .mqttConnectTimeout(5, TimeUnit.SECONDS) 
        .socketConnectTimeout(5, TimeUnit.SECONDS) 
        .applyTransportConfig()
        .buildAsync()

    //send an async connect
    val sendFuture = mqtt5AsyncClient
        .connectWith()
        .cleanStart(false)
        .send()
    
    //only the call to join is blocking
    sendFuture.join()

Why should I use join() and where am I subscribing with a callback in your code? Can you show me that please? I would also appreciate if this information is also documented in the Hive MQ Java client docs.

Hi Joesan,

the join was only used for demonstration purposes. To showcase everything until this point was completely async.

An example with callback would look like this (now in Java, as my kotlin is too weak to implement consumer functions):

    public static void main(String[] args) {
        // build async client
        var mqtt5AsyncClient = builder()
                .identifier("my-id")
                .serverHost("broker.hivemq.com")
                .serverPort(1883)
                .automaticReconnectWithDefaultConfig()
                .transportConfig()
                .mqttConnectTimeout(5, TimeUnit.SECONDS)
                .socketConnectTimeout(5, TimeUnit.SECONDS)
                .applyTransportConfig()
                .buildAsync();

        //send an async connect
        var sendFuture = mqtt5AsyncClient
                .connectWith()
                .cleanStart(false)
                .send();

        //send an async subscribe with a registered callback
        mqtt5AsyncClient
                .subscribeWith()
                .topicFilter("topic")
                .callback(System.out::println)
                .send();

    }

Regarding your point about documentation:

I would also appreciate if this information is also documented in the Hive MQ Java client docs.

There is an async connect example in the docs. Also we have a blogpost series on the MQTT client flavors:

But we sadly do not have unlimited resources. The mqtt client is an open source project, so if you feel the documentation could be improved I would encourage you to create an issue or even a pull request. We would be very happy about a contribution.

All the best
Georg

Ok, so the way I understand it is that I can call send() separately, one for the connection and the other for the subscription. I was lacking that information from the documentation.

Yes, the send is per MQTT control packet sent (so one per CONNECT, one per SUBSCRIBE, one per PUBLISH, …).

I can recommend the MQTT Essentials: MQTT Essentials - All Core Concepts Explained

Cheers
Georg