Restrictions in client

Hi!

I am working with the HiveMQ MQTT Client (https://github.com/hivemq/hivemq-mqtt-client) and I would like to set som restrictions but I’m not sure I’m doing it right.

I am using the code like this:

final Mqtt5AsyncClient client = Mqtt5Client.builder()
                .serverHost("broker.hivemq.com")
                .automaticReconnectWithDefaultConfig()
                .buildAsync();

            final Mqtt5ConnAck connAck = client.toBlocking().connectWith()
                    .cleanStart(false)
                    .sessionExpiryInterval(30)
                    .restrictions()
                        .receiveMaximum(20)
                        .sendMaximum(15) 
                        .maximumPacketSize(10_240)
                        .sendMaximumPacketSize(10_240)
                    .applyRestrictions()
                    .willPublish()
                        .topic("a/b/c/isla")
                        .qos(MqttQos.EXACTLY_ONCE)
                        .payload("hello world".getBytes())
                        .contentType("text/plain")
                    .applyWillPublish()
                    .send();
            System.out.println("connected " + connAck.getRestrictions());

Printed to console:

connected MqttConnAckRestrictions{receiveMaximum=10, maximumPacketSize=268435460, topicAliasMaximum=5, maximumQos=EXACTLY_ONCE, retainAvailable=true, wildcardSubscriptionAvailable=true, sharedSubscriptionAvailable=true, subscriptionIdentifiersAvailable=true}
  1. The restrictions are set and used (I think) when sending the message to the broker, but when printing the message System.out.println("connected " + connAck.getRestrictions());, the restrictions are back to default values? Why are the restrictions I set not saved, so I can use them later, to send a new message with the same restrictions?
  2. Why is the receive maximum value 10 (see “printed to console” above)? According to this documentation: https://hivemq.github.io/mqtt-cli/docs/shell/connect.html the default value should be 65535. Where does the value 10 come from?
  3. This code sends one message, but I would like to send multiple messages. How can I connect with restrictions, without sending a message right away? Is there a solution to keep the restrictions I set when connecting, even outside the connect method?

Thanks in advance

Isla

Hi @Isla,

in step 1. you are printing the connAck restrictions. These are restrictions that the broker sets for the client.

That is why you see different values like a receive maximum of 10 because that is the default limit HiveMQ sets for the client (see https://www.hivemq.com/docs/hivemq/4.3/user-guide/configuration.html#server-receive).

Kind regards,
Michael from the HiveMQ team

Hello @michael_w

Thank you for clarification!

If connAck.getRestrictions() prints the broker restrictions, how can I print the client restrictions?

If I add this code after my previous code:

           for(int i = 0; i < 1000; i++) {
               client.toBlocking().publishWith()
                        .topic("demo/topic/a")
                        .qos(MqttQos.EXACTLY_ONCE)
                        .payload(Integer.toString(i).getBytes())
                        .send();
           }

Will this code use the restrictions I set while connecting and if not, how can I send multiple messages with restrictions?

If I use the reactive API:

Completable connectScenario = client.connectWith().restrictions().sendMaximum(20).applyRestrictions().applyConnect()
                .doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode() + " restrictions: " + connAck.getRestrictions()))
                .doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
                .ignoreElement();

I guess that the printed restrictions here are the ones the broker sets, too?

If I later send messages with the reactive API:

        Flowable<Mqtt5Publish> messagesToPublish = Flowable.range(0, 2000)
                .map(i -> Mqtt5Publish.builder()
                        .topic("a/b/c")
                        .qos(MqttQos.AT_LEAST_ONCE)
                        .payload((i + " " + java.time.LocalTime.now()).getBytes())
                        .build());

        Completable publishScenario = client.publish(messagesToPublish)
                .ignoreElements();

        Completable disconnectScenario = client.disconnect().doOnComplete(() -> System.out.println("Disconnected"));

        connectScenario.andThen(publishScenario).andThen(disconnectScenario).blockingAwait();

Will the restrictions I set in Completable connectScenario be used when sending messages in publishScenario?

Kind regards

Isla

Hi @Isla,

with MQTT 5 it is possible for the client to set restrictions that the broker MUST respect (set in CONNECT packet) and the broker can do the same for the client (via CONNACK packet).

Using your first code snippet, I explain it with “maximum packet size” as an example:

final Mqtt5ConnAck connAck = client.toBlocking().connectWith()
                    .cleanStart(false)
                    .sessionExpiryInterval(30)
                    .restrictions()
                        .maximumPacketSize(10_240)
                    .applyRestrictions()
                    .send();

connAck.getRestrictions().getMaximumPacketSize();

The client tells the broker (via the CONNECT packet) it doesn’t want to receive any MQTT packet whose size is over 10.240 Bytes:

 .restrictions()
       .maximumPacketSize(10_240) // client doesn't want to receive packets over this size

The broker MUST accept this limit for the client, else it is considered a protocol error according to the MQTT 5 specification.

So now the broker responds with a CONNACK PACKET in which the broker tells the client the brokers packet size limit. This the client MUST accept.

connAck.getRestrictions().getMaximumPacketSize();  // broker doesn't want to receive packets over this size

Client maximumPacketSize(10_240) vs. sendMaximumPacketSize(10_240)
So now to the misunderstanding I think you have, the hivemq-mqtt-client also allows to set restrictions for the client -> broker communication. Keep in mind that those restrictions can be overwritten by the broker via the restrictions in the CONNACK packet.

Again maximum packet size as an example:

.restrictions()
     .maximumPacketSize(10_240)  // client doesn't want to receive packets over this size
    .sendMaximumPacketSize(20_000)  // client is not allowed to send packets over this size
.applyRestrictions()

connAck.getRestrictions().getMaximumPacketSize(); // let's say broker says his limit is 100_000 bytes

So what happens here is that the client won’t send packets to the broker that are over 20_000 bytes big even if the broker allows packet up to 100k bytes. This is because the hivemq-mqtt-client will internally pick the smaller value of sendMaximumPacketSize and connAck.getRestrictions().getMaximumPacketSize().
So if the broker would say it’s limit would be 10_000 bytes the client would pick this as the maximum packet size that is allowed to be sent to the broker.

This applies to all other restrictions that have the prefix send in it’s name, like

 .restrictions()
     .receiveMaximum(20) // server is allowed to sent 20 unacked messages to the client
     .sendMaximum(15) // client is allowed to sent 15 unacked messages to the broker, but could be overwritten if the broker sets even lower value via the CONNACK packet
.applyRestrictions()

Hope I could help,
Michael from the HiveMQ team

Hi @Isla,

I answer here some of your other questions

Your first comment part of 3. This code sends one message ....

Is there a solution to keep the restrictions I set when connecting, even outside the connect method?:

You can build an Mqtt5ConnectRestrictions and pass it in the connect:

  final Mqtt5ConnectRestrictions connectRestrictions = Mqtt5ConnectRestrictions.builder()
    .maximumPacketSize(10_000)
    .receiveMaximum(10)
    .build();

final Mqtt5ConnAck connAck = client.toBlocking().connectWith()
   .cleanStart(false)
   .sessionExpiryInterval(30)
   .restrictions(connectRestrictions)
   .willPublish()
   .topic("a/b/c/isla")
   .qos(MqttQos.EXACTLY_ONCE)
   .payload("hello world".getBytes())
   .contentType("text/plain")
   .applyWillPublish()
   .send();

Next question:
You are asking how you can send messages with checking the restrictions.
Answer, there is no need for you to check for example the max packet size or how many unacknowledged message you have currently, as the client is doing this internally for you. One of your code snippet as an example:

           for(int i = 0; i < 1000; i++) {
               client.toBlocking().publishWith()
                        .topic("demo/topic/a")
                        .qos(MqttQos.EXACTLY_ONCE)
                        .payload(Integer.toString(i).getBytes())
                        .send();
           }

The client will check internally if this PUBLISH message if above the max packet size limit the broker set and will throw an error accordingly.

Next question:
You are asking how to print the client restrictions. Just ask the client (after connecting to the broker):

client.getConfig().getConnectionConfig().ifPresent(restrictions -> {
     System.out.println("Client -> Broker restrictions: " + restrictions.getRestrictionsForClient());
     System.out.println("Broker -> Client restrictions: " + restrictions.getRestrictionsForServer());
});

Greetings,
Michael from the HiveMQ team

1 Like