Handling messages issue after some time

Hi,

I started using HiveMQ MQTT Client aprox 2-3months ago and until now I got twice same error but not sure if I made some mistake on my side or not.

I have create and connect client by:

Mqtt5AsyncClient client = MqttClient.builder()
        .useMqttVersion5()
        .identifier(clientId != null ? clientId : UUID.randomUUID().toString())
        .serverHost(host)
        .serverPort(port)
        .automaticReconnect()
	        .initialDelay(RECONNECT_MIN_DELAY, TimeUnit.SECONDS)
	        .maxDelay(RECONNECT_MAX_DELAY, TimeUnit.SECONDS)
	        .applyAutomaticReconnect()
        .addDisconnectedListener(new MqttClientDisconnectedListener() {
			@Override
			public void onDisconnected( MqttClientDisconnectedContext context) {
				LOGGER.warn("Disconnected");
			}
		})
        .addConnectedListener(new MqttClientConnectedListener() {
			@Override
			public void onConnected(MqttClientConnectedContext context) {
				LOGGER.info("Connected");
			}
		})
        .buildAsync();

client.connectWith().send().whenComplete(new BiConsumer<Mqtt5ConnAck, Throwable>() {
	@Override
	public void accept(Mqtt5ConnAck t, Throwable u) {
		if(u != null) {
			LOGGER.error("Error during connection to server: " + u.getMessage(), u);
		} else {
			LOGGER.info("Connected to {} and port {}", host, port);
		}
	}
});

I have subscribed 5 topics:

client.subscribeWith()
		.topicFilter(topic)
		.qos(MqttQos.fromCode(qos.getQos()))
		.callback(
					new Consumer<Mqtt5Publish>() {
						
						@Override
						public void accept(Mqtt5Publish protoMessage) {
							callback.onMessage(protoMessage);
						}
					}
				)
		.send();

I got this error which was appeared on most of received message:

2021-06-09 09:06:34.323 [ERROR] --- [com.hivemq.client.mqtt-1-1] c.h.c.i.m.h.p.i.MqttIncomingQosHandler : QoS 1 PUBLISH (MqttStatefulPublish{stateless=MqttPublish{topic=my_topic, payload=43byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=1, dup=false, topicAlias=0, subscriptionIdentifiers=[3]}) must not be resent (MqttStatefulPublish{stateless=MqttPublish{topic=my_topic, payload=43byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=1, dup=true, topicAlias=0, subscriptionIdentifiers=[3]}) during the same connection
com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5DisconnectException: QoS 1 PUBLISH must not be resent during the same connection
2021-06-09 09:06:34.324 [WARN] --- [com.hivemq.client.mqtt-1-1] c.u.p.core.mqtt.client.HiveMQClient : Disconnected
2021-06-09 09:06:38.897 [INFO] --- [com.hivemq.client.mqtt-1-1] c.u.p.core.mqtt.client.HiveMQClient : Connected

First time what happend to me I only restarted app and error disappered because. (I did not have time to solve it but I stored the logs)
After 5 week error appered again. Four of my five topics on message first handled message after that I got this error and client reconnected. Messages of one topic was not even handled only error was logged. All subscribtions use same callback class (each has own instance), callback’s onMessage method contains topic parsing, store data to database, wrap message and send to Kafka.

First one or two days what data were not comming to database from that one topic I did not see any error in log.

Does anyone has idea where is the root of this error and what caused it and how to repaire it?

Thanks,
Roman

I have communication log from wireshark, I see there first is send message from broker to my app but there is not ACK message even when HiveMQ accept message and was sucessfully handled. Broker resend same message with set DUP flag as true after 30sec. HiveMQ took this second message and evaluated it as same message.

Hi Roman,

Thank you for providing the packet trace.
We will have a look and get back to you with our findings.

Best,
Florian from The HiveMQ Team.

Hello Roman,

We are unable to reproduce this behaviour, Could you please share with us your complete code to check or exact steps to reproduce this issue.

Kind regards,
Sheetal

Hi Sheetal,
I do not know how to reproduce but happend to me twice everytime aprox after 1month of running application.

There is my part of code which work with MQTT, First is hiveMQ client:

package my.package;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import my.package.model.MqttConnectionSource;
import my.package.model.MqttSubscriberCallback;
import my.package.model.PublisherCallback;
import my.package.model.Qos;
import my.package.model.RequestId;

class HiveMQClient implements my.package.MqttClient {

	private static final Logger LOGGER = LoggerFactory.getLogger(HiveMQClient.class);

	protected static final int RECONNECT_MIN_DELAY = 1;
	protected static final int RECONNECT_MAX_DELAY = 14400;
	
	private Mqtt5AsyncClient client;

	private final String host;

	private final int port;

	private final String clientId;
	
	HiveMQClient(MqttConnectionSource source) {
		host = source.getHost();
		port = source.getPort();
		clientId = source.getClientId();
		LOGGER.info("Initialization MQTT connection to {} and port {}", host, port);
		connect();
	}
	
	private void connect() {
		try {
			client = MqttClient.builder()
			        .useMqttVersion5()
			        .identifier(clientId != null ? clientId : UUID.randomUUID().toString())
			        .serverHost(host)
			        .serverPort(port)
			        .automaticReconnect()
				        .initialDelay(RECONNECT_MIN_DELAY, TimeUnit.SECONDS)
				        .maxDelay(RECONNECT_MAX_DELAY, TimeUnit.SECONDS)
				        .applyAutomaticReconnect()
			        .addDisconnectedListener(new MqttClientDisconnectedListener() {
						
						@Override
						public void onDisconnected( MqttClientDisconnectedContext context) {							
							LOGGER.warn("Disconnected");
						}
					})
			        .addConnectedListener(new MqttClientConnectedListener() {
						
						@Override
						public void onConnected(MqttClientConnectedContext context) {
							LOGGER.info("Connected");
						}
					})
			        .buildAsync();
			
			client.connectWith().send().whenComplete(new BiConsumer<Mqtt5ConnAck, Throwable>() {

				@Override
				public void accept(Mqtt5ConnAck t, Throwable u) {
					if(u != null) {
						LOGGER.error("Error during connection to server: " + u.getMessage(), u);
					} else {
						LOGGER.info("Connected to {} and port {}", host, port);
					}
				}
			});	
		} catch (Exception e) {
			connect();
		}	
	}

	@Override
	public void publish(RequestId requestId, String topic, byte[] message, Qos qos, PublisherCallback callback) {
		checkClient();
		client.publishWith()
					.topic(topic)
					.payload(message)
					.qos(MqttQos.fromCode(qos.getQos()))
					.send()
					.whenComplete(new BiConsumer<Mqtt5PublishResult, Throwable>() {

							@Override
							public void accept(Mqtt5PublishResult t, Throwable u) {
								if(u != null) {
									if(callback != null) {
										callback.onError(requestId, u);
									}
									LOGGER.error("Error sending message " + message + " with id " + requestId + " on topic " + topic + ". Reason: " + u.getMessage());
								} else {
									if(callback != null) {
										callback.onPublished(requestId);
									}
									LOGGER.info("Message {} with id {} to topic {} sent", message, requestId, topic);
								}
							}
						});		
	}

	private void checkClient() {
		if(client == null) {
			throw new IllegalArgumentException("MQTT connection is not created.");
		}
	}



	@Override
	public void subscribe(RequestId requestId, String topic, Qos qos, MqttSubscriberCallback callback) {
		checkClient();
		client.subscribeWith()
				.topicFilter(topic)
				.qos(MqttQos.fromCode(qos.getQos()))
				.callback(
							new Consumer<Mqtt5Publish>() {
								
								@Override
								public void accept(Mqtt5Publish protoMessage) {
									callback.onMessage(protoMessage);
								}
							}
						)
				.send()
				.whenComplete(new BiConsumer<Mqtt5SubAck, Throwable>() {

						@Override
						public void accept(Mqtt5SubAck t, Throwable u) {
							if(u == null) {
								callback.onSubscribed(requestId);
								LOGGER.info("Topic {} with id {} was successfully subscribed", topic, requestId);
							} else {
								callback.onError(requestId, u);
								LOGGER.error("Error subscribing topic " + topic + " with request id " + requestId + ". Reason: " + u.getMessage(), u);
							}
						}
					});				
	}
	
}

Mqtt service:

package my.package;

import java.lang.reflect.InvocationTargetException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import my.package.MqttClient;
import my.package.model.MqttSubscriberCallback;
import my.package.model.ObjectMessagingService;
import my.package.model.PublisherCallback;
import my.package.model.Qos;
import my.package.model.RequestId;
import my.package.model.SubscriberCallback;
import my.package.utils.ProtobufUtils;

class DefaultObjectMessagingService implements ObjectMessagingService {

	private final MqttClient client;
	private static final Logger LOGGER = LoggerFactory.getLogger(DefaultObjectMessagingService.class);
	
	public DefaultObjectMessagingService(MqttClient client) {
		if(client == null) {
			throw new IllegalArgumentException("Client cannot be null");
		}
		this.client = client;
	}

	@Override
	public void publish(RequestId requestId, String topic, Message message, Qos qos, PublisherCallback callback) {
		try {
			client.publish(requestId, topic, message.toByteArray(), qos, callback);
		} catch (IllegalArgumentException | SecurityException e) {
			LOGGER.error("Error publishing message with id " + requestId, e);
			callback.onError(requestId, e);
		}		
	}

	@Override
	public void subscribe(RequestId requestId, String topic, Class<? extends Message> messageClass, Qos qos, SubscriberCallback<Message> callback) {
		client.subscribe(
				requestId,
				topic, 
				qos,
				new MqttSubscriberCallback() {
					
					@Override
					public void onError(RequestId requestId, Throwable cause) {
						callback.onError(requestId, cause);
					}
					
					@Override
					public void onSubscribed(RequestId requestId) {
						callback.onSubscribed(requestId);
					}
					
					@Override
					public void onMessage(Mqtt5Publish message) {
						try {
							callback.onMessage(message.getTopic().toString(), ProtobufUtils.convertToProtobuf(messageClass, message.getPayloadAsBytes()));
						} catch (InvalidProtocolBufferException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
							LOGGER.error("Error during parsing incoming message subscribed for " + topic, e);
							callback.onError(requestId, e);
						}	
					}
				}
		);		
	}

}

Create MQTT service bean with client from factory. Factory contains only return new HiveMQClient(source);

package my.package.mqtt.configuration;

import java.util.UUID;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import my.package.core.mqtt.client.MqttClientFactory;
import my.package.core.service.ObjectMessagingServiceBuilder;
import my.package.model.MqttClientType;
import my.package.model.MqttConnectionSource;
import my.package.model.ObjectMessagingService;

@Configuration
public class MqttMessagingServiceConfiguration {

	@Value( "${server.mqtt.url}" )
	private String host;

	@Value( "${server.mqtt.port}" )
	private int port;

	@Value( "${server.mqtt.clientid:#{null}}" )
	private String clientId;
	
	@Bean
	public ObjectMessagingService jsonMessagingService() {
		if(clientId == null || clientId.isEmpty()) {
			clientId = UUID.randomUUID().toString();
		}
		
		return new ObjectMessagingServiceBuilder(
				MqttClientFactory.create(
						MqttClientType.HIVEMQ, 
						new MqttConnectionSource(
									host, 
									port, 
									clientId
								)
					)
				).build();
	}
}

Subscribing topics from class Topics holding conllection of all topics. RequestId contains variable long id = ThreadLocalRandom.current().nextLong(1, new Long(Integer.MAX_VALUE));

package my.package.mqtt.configuration;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import my.package.model.Topic;
import my.package.model.Topics;
import my.package.mqtt.subscriber.callback.MessageSubscriberCallback;
import my.package.persistence.DeviceCommunicationRepository;
import my.package.model.ObjectMessagingService;
import my.package.model.RequestId;

@Component
public class MqttSubscribtionInitializer {
	
	@Autowired
	private Topics topics;

	@Autowired
	private DeviceCommunicationRepository deviceCommunicationRepository;

	@Autowired
	private ObjectMessagingService messagingService;
	
	@PostConstruct
	public void init() {
			for(Topic topic : topics.getTopics()) {
				if(topic.getType() == null) {
					continue;
				}
					
				RequestId id = new RequestId();
				messagingService.subscribe(
						id, 
						topic.getTopic(), 
						topic.getType().getClazz(), 
						topic.getQos(), 
						new MessageSubscriberCallback(topic, deviceCommunicationRepository));
				
			}
	}
}

Callback body:

package my.package.mqtt.subscriber.callback;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import my.package.http.model.Metadata;
import my.package.model.CommunicationDirection;
import my.package.model.Topic;
import my.package.persistence.DeviceCommunicationRepository;
import my.package.model.RequestId;
import my.package.model.SubscriberCallback;

public class MessageSubscriberCallback implements SubscriberCallback<Message> {

	private static final Logger LOGGER = LoggerFactory.getLogger(MessageSubscriberCallback.class);
	private final Topic topic;
	private final ObjectMessageProducer producer;
	private final DeviceCommunicationRepository communicationRepository;

	public MessageSubscriberCallback(Topic topic, ObjectMessageProducer producer, DeviceCommunicationRepository communicationRepository) {
		this.topic = topic;
		this.producer = producer;
		this.communicationRepository = communicationRepository;
	}

	@Override
	public void onError(RequestId requestId, Throwable cause) {
		LOGGER.error("Error subscribing topic " + topic, cause);
	}

	@Override
	public void onSubscribed(RequestId requestId) {
		LOGGER.info("Topic {} was sucessfully subscribed", topic);
	}

	@Override
	public void onMessage(String topic, Message message) {
		
		Pattern p = Pattern.compile("event/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)");
		Matcher m = p.matcher(topic);
		Assert.isTrue(m.find(), "Could not parser topic " + topic);
		Assert.isTrue(m.groupCount() == 5, "Invalid topic " + topic);
		
		Metadata metadata = new Metadata(m.group(1), m.group(2), new Integer(m.group(3)));
		try {
			communicationRepository.store(m.group(4), metadata, m.group(5), messageToJson(message), CommunicationDirection.IN);
		} catch (InvalidProtocolBufferException e) {
			LOGGER.error("Could not transform message to json", e);
		}
	}
	
	private String messageToJson(Message message) throws InvalidProtocolBufferException {
		return JsonFormat.printer().omittingInsignificantWhitespace().print(message);
	}
	
}

Roman