I have the following client that subscribes to a topic and I would like to create a Persistent subscription, but I could not find the suitable builder in the Java client version 1.3.3
def subscribe(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
val client = asyncMqttClient(mqttCfg, isPersistentConn = true, id = topic)
// Define a custom wrapper type to represent the result of the publish operation
sealed trait PublishResult
case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
case class FailedPublish(error: Throwable) extends PublishResult
client.toAsync
.subscribeWith()
.topicFilter(topic)
.qos(mqttQos) //.cleanStart(false)
.callback(publish => println(publish.getTopic)) // TODO: proper callback
.send().asScala.map(_ => ())
}
It says cleanStart(…) is not available. The documentation is also not that useful. How can I create a Persistent subscription ?
Thank you very much for expressing your interest in MQTT and the HiveMQ broker. We’re genuinely excited to welcome you to the HiveMQ Community!
Regarding your query about creating a persistent session for the MQTT client, allow me to provide some guidance. To establish a persistent session, the client needs to connect with cleanStart=false and sessionExpiryInterval>0. Here’s an example snippet illustrating this:
Please keep in mind that only publish packets with QoS 1 and QoS 2 are persisted. Therefore, it’s essential to ensure that your client is using either QoS 1 or QoS 2 when subscribing or publishing.
I trust this information proves helpful to you. If you have any further questions or need additional assistance, please don’t hesitate to reach out.
I appreciate that you are trying to help me, but as I pointed out in my question, I would need to use the async API and in the async API, I do not see the cleanStart method. So how do I get a cleanStart using the async API? Here is how I’m using the builder:
def asyncMqttClient(mqttCfg: MqttConfig, isPersistentConn: Boolean, id: String): Mqtt5AsyncClient = {
Mqtt5Client.builder()
.identifier(s"${mqttCfg.appName}-$id")
.serverHost(mqttCfg.serverHost)
.serverPort(mqttCfg.serverPort)
.automaticReconnectWithDefaultConfig()
.transportConfig()
.mqttConnectTimeout(5, TimeUnit.SECONDS) // TODO: Get this value from the config
.socketConnectTimeout(5,TimeUnit.SECONDS) // TODO: Get this value from the config
.applyTransportConfig()
//.simpleAuth()
//.username(mqttCfg.user.getOrElse(""))
//.password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
//.applySimpleAuth()
.buildAsync()
}
Why should I use join() and where am I subscribing with a callback in your code? Can you show me that please? I would also appreciate if this information is also documented in the Hive MQ Java client docs.
But we sadly do not have unlimited resources. The mqtt client is an open source project, so if you feel the documentation could be improved I would encourage you to create an issue or even a pull request. We would be very happy about a contribution.
Ok, so the way I understand it is that I can call send() separately, one for the connection and the other for the subscription. I was lacking that information from the documentation.