Message expiry time/TTL issue

Hello,
Currently we are having difficulty setting a TTL for published messages.
Tried on broker versions:

  • HiveMQ Enterprise Server / HiveMQ version: 4.7.5
  • HiveMQ Enterprise Server / HiveMQ version: 4.6.4

And client version:

  • hivemq-mqtt-client:1.3.0

Here is the scenario:
Let’s have two clients, one publisher and one subscriber. They are both working on the same topic. They both set a unique client identifier while connecting.

  1. Client A: Publishes a message with a TTL value to a topic, here is publisher code:
public class Mqtt5AsyncPublisher extends AbstractAsyncHiveMqClient {

    private static final Integer MESSAGE_EXPIRE_TIME_SECONDS = 15;
    private Mqtt5AsyncClient client;

    /**
     * build an mqtt 5 compliant client
     */
    @PostConstruct
    private void buildClient() {
        client = MqttClient.builder()
          .useMqttVersion5()
          .serverHost(hivemqHost)
          .serverPort(hivemqPort)
          .identifier(clientIdMqtt5)
          .automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
          .buildAsync();

        log.info("Built mqtt v5 client..");

        client.connectWith()
          .cleanStart(false)
          .noSessionExpiry()
          .send()
          .whenComplete((mqtt5ConnAck, throwable) -> {
              if (throwable != null) {
                  log.error("Unable to connect to broker!", throwable);
              } else {
                  log.info("Connected to hivemq broker with mqtt v5 client..");
              }
          });
    }

    /**
     * @param topic   target hivemq topic
     * @param vin    
     * @param payload kafka to hivemq message payload
     * @param headers kafka headers which will be passed as UserPropertiesto hivemq mesage
     */
    public void publishToHivemqBroker(String topic, String vin, byte[] payload, Headers headers) {

        // set kafka headers as hivemq UserProperties
        Mqtt5UserPropertiesBuilder userPropertiesBuilder = Mqtt5UserProperties.builder();

        Arrays.stream(headers.toArray())
          .map(header -> Mqtt5UserProperty.of(String.valueOf(header.key()), new String(header.value())))
          .collect(Collectors.toCollection(LinkedList::new))
          .stream()
          .map(userPropertiesBuilder::add);

        client.publishWith()
          .topic(topic)
          .messageExpiryInterval(MESSAGE_EXPIRE_TIME_SECONDS)
          .payload(payload)
          .userProperties(userPropertiesBuilder.build())
          .qos(MqttQos.AT_LEAST_ONCE)
          .send()
          .whenComplete((mqtt5Publish, throwable) -> {
              if (throwable != null) {
                  log.warn("Publish failed!! Details: " + throwable.getMessage());
              } else {
                  log.info("Published successfully to hivemq topic: " + topic);
              }
          });
    }

}
  1. Client B: Here is full subscriber code:
public class InterruptedSubscriber {

    private static final String hivemqHost = "localhost";
    private static final Integer hivemqPort = 1883;

    private static final Integer SESSION_EXPIRE_TIME_SECONDS = 60 * 30;
    private static final String INTERRUPTED_CLIENT_ID = "interrupted_client2";
    private static final String TOPIC = "EU/vin_1234567890/ALERT_RESPONSE";

    private Mqtt5AsyncClient client;

    public static void main(String[] args) {
        InterruptedSubscriber subscriber = new InterruptedSubscriber();
        subscriber.buildClient();
    }

    public void buildClient() {
        client = MqttClient.builder()
          .useMqttVersion5()
          .serverHost(hivemqHost)
          .serverPort(hivemqPort)
          .identifier(INTERRUPTED_CLIENT_ID)
          .automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
          .buildAsync();

        log.info("Built client: " + client);

        client.connectWith()
          .cleanStart(false)
          .sessionExpiryInterval(SESSION_EXPIRE_TIME_SECONDS)
          .send()
          .whenComplete((mqtt5ConnAck, throwable) -> {
              if (throwable != null) {
                  log.info("Unable to connect to broker.", throwable);
              } else {
                  log.info("Connected.");
                  subscribe();
              }
          });

        System.out.println("Here");
    }

    private void subscribe() {

        client.subscribeWith()
          .topicFilter(TOPIC)
          .qos(MqttQos.AT_LEAST_ONCE)
          .callback(mqtt5Publish -> {
              log.info("Received: \"" + new String(mqtt5Publish.getPayloadAsBytes()) + "\" from topic: " + mqtt5Publish.getTopic());
          })
          .send()
          .whenComplete((mqtt5SubAck, throwable) -> {
              if (throwable != null) {
                  log.error("Failed to subscribe.", throwable);
                  client.disconnect();
              } else {
                  log.info("Subscribed.");
              }
          });
    }

}
  1. Clent A sends a message to topic, Client B successfully consumes it.
  2. Client B disconnects, Client A sends further messages.
  3. Client B reconnects before message expire time(15 s)

Now we are expecting Client B to be able to get those messages while it’s disconnected.
But it doesn’t get those messages. It’s producing following warning:

11:27:53.714 [com.hivemq.client.mqtt-1-1] WARN com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=EU/vin_1234567890/ALERT_RESPONSE, payload=28byte, qos=AT_LEAST_ONCE, retain=false, messageExpiryInterval=6}, packetIdentifier=1, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.

Could you pls check this case?
Thanks

Hello @mtorak ,

Thank you for your request. I have checked your idea using MQTT CLI client and it seems that the reason of your outcome is:

                .cleanStart(false)

In case if you would like to check on your own here are the commands:

# subscriber subscribes
mqtt sub -t TOPIC -d -J --no-cleanStart -se 60 -V 5 -I sub

# publisher publishes when the subscriber is connected
mqtt pub -m HelloConnected -t TOPIC -V 5 -e 60 -d -q 1

# I disconnect the subscriber session with Ctrl+C

# publisher publishes when the subscriber is disconnected
mqtt pub -m HelloDisconnected -t TOPIC -V 5 -e 60 -d -q 1

# subscriber reconnects while the Message Expiry of the PUBLISH has not expired
mqtt sub -t TOPIC -d -J --no-cleanStart -se 60 -V 5 -I sub

As a result my subscriber successfully receives the message with “HelloDisconnected”. When you try the same with --cleanStart then subscriber will not receive the message.

Kind regards,
Dasha from HiveMQ team

Hello Dasha,
I’ve tried your commands, you are right, subscriber consumed messages after reconnecting.
By the way, isn’t “–no-cleanStart” means cleanStart(false) already?
So i’m setting cleanStart option to false via client api. I think both usages are doing the same thing.
The only difference i see is -d and -J options. What are those stand for?

Thanks
KR

@mtorak ,

Yes, you are right , --no-cleanStart should do what .cleanStart(false) does. The -d is --debug to get debug output and -J is to format the message as JSON for my readability.

Kind regards,
Dasha from HiveMQ team

OK, just like what i thought. But i need a solution via using your java client.
As far as i can see, mqtt-cli is also using your java client to connect broker, below snippet is from mqtt-cli build.gradle.kts file:

 implementation("com.hivemq:hivemq-mqtt-client:${property("hivemq-client.version")}")

So, how can i achieve this behaviour via java client?
I can provide you a minimal java/maven project to reproduce the case, if you want.

Thanks
KR

Hey @mtorak ,

Yes, correct, MQTT CLI is based on our HiveMQ MQTT Client library and it is also open source, so you can take a look, if you like:

Regards,
Dasha from HiveMQ team

Hey Daria,
thank you for pointing out, meanwhile I’ve discovered the cause;

client.subscribeWith()

must be called before

client.connectWith()

After this change my subscriber behaved as expected.
Thank you for your support,
KR

1 Like