Hello,
Currently we are having difficulty setting a TTL for published messages.
Tried on broker versions:
- HiveMQ Enterprise Server / HiveMQ version: 4.7.5
- HiveMQ Enterprise Server / HiveMQ version: 4.6.4
And client version:
- hivemq-mqtt-client:1.3.0
Here is the scenario:
Let’s have two clients, one publisher and one subscriber. They are both working on the same topic. They both set a unique client identifier while connecting.
- Client A: Publishes a message with a TTL value to a topic, here is publisher code:
public class Mqtt5AsyncPublisher extends AbstractAsyncHiveMqClient {
private static final Integer MESSAGE_EXPIRE_TIME_SECONDS = 15;
private Mqtt5AsyncClient client;
/**
* build an mqtt 5 compliant client
*/
@PostConstruct
private void buildClient() {
client = MqttClient.builder()
.useMqttVersion5()
.serverHost(hivemqHost)
.serverPort(hivemqPort)
.identifier(clientIdMqtt5)
.automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
.buildAsync();
log.info("Built mqtt v5 client..");
client.connectWith()
.cleanStart(false)
.noSessionExpiry()
.send()
.whenComplete((mqtt5ConnAck, throwable) -> {
if (throwable != null) {
log.error("Unable to connect to broker!", throwable);
} else {
log.info("Connected to hivemq broker with mqtt v5 client..");
}
});
}
/**
* @param topic target hivemq topic
* @param vin
* @param payload kafka to hivemq message payload
* @param headers kafka headers which will be passed as UserPropertiesto hivemq mesage
*/
public void publishToHivemqBroker(String topic, String vin, byte[] payload, Headers headers) {
// set kafka headers as hivemq UserProperties
Mqtt5UserPropertiesBuilder userPropertiesBuilder = Mqtt5UserProperties.builder();
Arrays.stream(headers.toArray())
.map(header -> Mqtt5UserProperty.of(String.valueOf(header.key()), new String(header.value())))
.collect(Collectors.toCollection(LinkedList::new))
.stream()
.map(userPropertiesBuilder::add);
client.publishWith()
.topic(topic)
.messageExpiryInterval(MESSAGE_EXPIRE_TIME_SECONDS)
.payload(payload)
.userProperties(userPropertiesBuilder.build())
.qos(MqttQos.AT_LEAST_ONCE)
.send()
.whenComplete((mqtt5Publish, throwable) -> {
if (throwable != null) {
log.warn("Publish failed!! Details: " + throwable.getMessage());
} else {
log.info("Published successfully to hivemq topic: " + topic);
}
});
}
}
- Client B: Here is full subscriber code:
public class InterruptedSubscriber {
private static final String hivemqHost = "localhost";
private static final Integer hivemqPort = 1883;
private static final Integer SESSION_EXPIRE_TIME_SECONDS = 60 * 30;
private static final String INTERRUPTED_CLIENT_ID = "interrupted_client2";
private static final String TOPIC = "EU/vin_1234567890/ALERT_RESPONSE";
private Mqtt5AsyncClient client;
public static void main(String[] args) {
InterruptedSubscriber subscriber = new InterruptedSubscriber();
subscriber.buildClient();
}
public void buildClient() {
client = MqttClient.builder()
.useMqttVersion5()
.serverHost(hivemqHost)
.serverPort(hivemqPort)
.identifier(INTERRUPTED_CLIENT_ID)
.automaticReconnect(MqttClientAutoReconnectImpl.DEFAULT)
.buildAsync();
log.info("Built client: " + client);
client.connectWith()
.cleanStart(false)
.sessionExpiryInterval(SESSION_EXPIRE_TIME_SECONDS)
.send()
.whenComplete((mqtt5ConnAck, throwable) -> {
if (throwable != null) {
log.info("Unable to connect to broker.", throwable);
} else {
log.info("Connected.");
subscribe();
}
});
System.out.println("Here");
}
private void subscribe() {
client.subscribeWith()
.topicFilter(TOPIC)
.qos(MqttQos.AT_LEAST_ONCE)
.callback(mqtt5Publish -> {
log.info("Received: \"" + new String(mqtt5Publish.getPayloadAsBytes()) + "\" from topic: " + mqtt5Publish.getTopic());
})
.send()
.whenComplete((mqtt5SubAck, throwable) -> {
if (throwable != null) {
log.error("Failed to subscribe.", throwable);
client.disconnect();
} else {
log.info("Subscribed.");
}
});
}
}
- Clent A sends a message to topic, Client B successfully consumes it.
- Client B disconnects, Client A sends further messages.
- Client B reconnects before message expire time(15 s)
Now we are expecting Client B to be able to get those messages while it’s disconnected.
But it doesn’t get those messages. It’s producing following warning:
11:27:53.714 [com.hivemq.client.mqtt-1-1] WARN com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=EU/vin_1234567890/ALERT_RESPONSE, payload=28byte, qos=AT_LEAST_ONCE, retain=false, messageExpiryInterval=6}, packetIdentifier=1, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
Could you pls check this case?
Thanks