Overlapping subcriptions trigger exponentially on flowables

Edit: At first I assumed the wildcard subscription was shadowing the specific one, with further testing and a test project I noticed that the shadowing was actually just the specific messages arriving late, there is another problem though.

Hi,

I saw this behaviour in the 1.3.4 and 1.3.12 version of the client library.
I use Kotlin 2.2.x and create and connect the client like this

val mqttClient = MqttClient
  .builder()
  .useMqttVersion5()
  <credentials>
  .buildRx()
  .connect(<Connect Builder>)

After that I have two subscriptions:

The wildcard generic one:

mqttClient
  .client.subscribePublishes(
    Mqtt5Subscribe.builder()
      .topicFilter("#")
      .qos(MqttQos.EXACTLY_ONCE)
      .build()
  )
  .subscribe { println("Generic") }

And a more specific one:

mqttClient
  .client.subscribePublishes(
    Mqtt5Subscribe.builder()
      .topicFilter("test")
      .qos(MqttQos.EXACTLY_ONCE)
      .build()
  )
  .subscribe { println("Specific") }

What do I expect to happen?
When publishing on test I expect to have “Generic” and “Specific” printed once in any order.
When publishing on foo I expect to have “Generic” printed once, but not “Specific”.

What actually happens?
When publishing on test “Generic” and “Specific” are printed twice, so 4 prints.
When publishing on foo “Generic” is printed once.

When 3 topics are matching I get 9 prints, 4 result in 16 and so on.

Notice that I decided against using one “master subscription” at just # and separating messages in the flowables on purpose.
Also through debugging I see that both subscriptions are made at the cient.
In the broker logs I can also see that two messages/publishes are sent to the client on one message to the test topic.

Is this expected behaviour? Do I have to configure the client/subscription differently?
I really don’t want one client per subscription.

Hi @splitframe!

Happy to have you here. Starting with MQTT and HiveMQ can be a learning curve, but you’re not alone — we’re here to help.

Thank you for the question.

This is caused by the order in which the subscriptions and connection are set up. The wildcard subscription (#) triggers the broker to send matching messages immediately, before the specific subscription (test) has been registered in the client’s topic tree.

Try the following and confirm whether it shows different results:

  1. Swap the subscription order — subscribe to test first, then #
  2. Move the connect call after both subscriptions are set up

val client = MqttClient.builder()
    .useMqttVersion5()
    // <credentials>
    .buildRx()

// 1. Subscribe to the specific topic first
client.subscribePublishes(
    Mqtt5Subscribe.builder()
        .topicFilter("test")
        .qos(MqttQos.EXACTLY_ONCE)
        .build()
)
.subscribe { println("Specific") }

// 2. Then subscribe to the wildcard
client.subscribePublishes(
    Mqtt5Subscribe.builder()
        .topicFilter("#")
        .qos(MqttQos.EXACTLY_ONCE)
        .build()
)
.subscribe { println("Generic") }

// 3. Connect last
client.connect(<Connect Builder>)

Let us know the outcome
Best regards,
Dasha from The HiveMQ Team

Hello Dasha,

thank you for the fast reply, changing the subscription order and
putting connect at the end sadly did not help.
In the mean time I found this issue and am currently reading through it: How to use subscription identifier - Duplicate messages with overlapping topics · Issue #596 · hivemq/hivemq-mqtt-client · GitHub

Edit: It seems that indeed the missing mapping of subscription identifiers to in-code subscriptions is the culprit.

Edit2:

This is caused by the order in which the subscriptions and connection are set up. The wildcard subscription (# ) triggers the broker to send matching messages immediately, before the specific subscription (test ) has been registered in the client’s topic tree.

Also, this is not a retained issue, I trigger the messages from hand via mqtt explorer.