Expected behavior
I want to have a callback to listen to every topic I subscribe to just once per message sent. I mean, I want to subscribe to a topic 1000 times, but when a message is received, I want to listen to it just one time.
IDK if there is something I am doing wrong (I guess).
Actual behavior
- I am developing a home security camera app.
- I have a list of cameras that I own.
- For every camera on the list, I subscribe to a topic.
- Every 30s, I update the screen, and again I subscribe to a topic for every camera. This means a TOPIC could be subscribed many times.
- Every time I receive a message on a topic, the callback fires messages about how many times the same topi was subscribed.
To Reproduce
Steps
- haven a topic camera/123
- subscribe the topic N times with the below method called subscribeWith
- Send a message over camera/123
- You will receive the message N times because the N time you subscribed to the topic
Reproducer code
Just variables
private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883
Build the MQTT
private fun build() {
if (mqtt != null) return
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { Timber.d("On Connected") }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
}
Connecting the MQTT
fun connect(username: String, password: String) {
build()
this.username = username
this.password = password
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username("abc")
?.password("123".toByteArray())
?.applySimpleAuth()
?.send()
}
And then, subscribing a topic
Every time I subscribe a topic I use these fun
fun subscribeWith(topic: String) {
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_MOST_ONCE)
?.callback { t -> onConsumingTopic(t) } <- I THINK THIS IS THE IMPORTANT THING
?.send()
?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }
}