Expected behavior
I want to have a callback to listen to every topic I subscribe to just once per message sent. I mean, I want to subscribe to a topic 1000 times, but when a message is received, I want to listen to it just one time.
IDK if there is something I am doing wrong (I guess).
Actual behavior
- I am developing a home security camera app.
- I have a list of cameras that I own.
- For every camera on the list, I subscribe to a topic.
- Every 30s, I update the screen, and again I subscribe to a topic for every camera. This means a TOPIC could be subscribed many times.
- Every time I receive a message on a topic, the callback fires messages about how many times the same topi was subscribed.
To Reproduce
Steps
- haven a topic camera/123
- subscribe the topic N times with the below method called subscribeWith
- Send a message over camera/123
- You will receive the message N times because the N time you subscribed to the topic
Reproducer code
Just variables
private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883
Build the MQTT
private fun build() {
        if (mqtt != null) return
        mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { Timber.d("On Connected") }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()
    }
Connecting the MQTT
fun connect(username: String, password: String) {
        build()
        this.username = username
        this.password = password
        mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username("abc")
                ?.password("123".toByteArray())
                ?.applySimpleAuth()
                ?.send()
    }
And then, subscribing a topic
Every time I subscribe a topic I use these fun
fun subscribeWith(topic: String) {
        mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_MOST_ONCE)
               ?.callback { t -> onConsumingTopic(t) }  <- I THINK THIS IS THE IMPORTANT THING
                ?.send()
                ?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }
    }