Mqtt3Client Flowable Timeout Exception

Hello everyone

Im working with HiveMq Client Mqtt3 for Android and when i called Flowable Subscribe function i receive a TimeOutException

I’ve tried to set timeOut property but without success.

I called connect().andThen(subscribeTopic().ignoreElements())

1 - Connect OK
2 - Subscribe OK but the subscription returned in onError() -> The source did not signal an event for 6 seconds and has been terminated.

Does anyone know how can avoid this exception? Because this stop the Thread and don’t execute another .andThen in sequence… Example: connect().andThen(subscribe()).andThen(disconnect())

Best
Virgilio Magalhaes

Hi Virgilio,

Nice to see your interest in MQTT and the HiveMQ MQTT client.
Can you please share a more complete snippet of your code, as well as the error you are seeing?

Thank you and regards,
Florian

Hello Florian,

follow part of my code. Thank you.


provider.setup()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnSubscribe { Timber.d("App onSubscribe") }
                .timeout(6L, TimeUnit.SECONDS)
                .subscribe(
                    { Timber.d("App onComplete") },
                    { Timber.d("App onError ${it.message}") }
                ).apply { disposable.add(this) }

Provider setup will call init function in my Provider, the function always returned for callback onError with Exception message described before

init {
connect().andThen(subscribeDefault())
}

fun connect(): Completable {

        if (this.client.state == MqttClientState.CONNECTED ||
            this.client.state == MqttClientState.CONNECTING) {
            Timber.i("MQTT BROKER >>> MqttClientState.CONNECTED")
            return Completable.complete()
        }

        return this.client.connect(mqttConnect)
            .doOnSuccess {
                Timber.i("MQTT BROKER >>> CONNECTED WITH HIVEMQ CLIENT::: ${it.returnCode}")
            }
            .doOnError {
                Timber.e("MQTT BROKER >>> DON'T CONNECTED WITH HIVEMQ CLIENT::: ${it.localizedMessage}")
            }.ignoreElement()
    }

fun subscribe(topicFilter: String): Flowable<String> {
        return this.client.subscribeStreamWith()
            .topicFilter(topicFilter)
            .qos(MqttQos.AT_LEAST_ONCE)
            .applySubscribe()
            .doOnSingle { suback ->
                Timber.i("MQTT BROKER >>> TOPIC $topicFilter SUBSCRIBE COMPLETED WITH CODES: ${suback.returnCodes}")
            }
            .doOnNext { message ->
                Timber.i("MQTT BROKER >>> MESSAGE RECEIVED --- ${String(message.payloadAsBytes)}")
            }
            .doOnError {
                Timber.e("MQTT BROKER >>> SUBSCRIBE TOPIC ERRROR ")
            }
            .map {
                String(it.payloadAsBytes)
            }.onBackpressureBuffer(true)
    }