Hi Sheetal,
I do not know how to reproduce but happend to me twice everytime aprox after 1month of running application.
There is my part of code which work with MQTT, First is hiveMQ client:
package my.package;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import my.package.model.MqttConnectionSource;
import my.package.model.MqttSubscriberCallback;
import my.package.model.PublisherCallback;
import my.package.model.Qos;
import my.package.model.RequestId;
class HiveMQClient implements my.package.MqttClient {
private static final Logger LOGGER = LoggerFactory.getLogger(HiveMQClient.class);
protected static final int RECONNECT_MIN_DELAY = 1;
protected static final int RECONNECT_MAX_DELAY = 14400;
private Mqtt5AsyncClient client;
private final String host;
private final int port;
private final String clientId;
HiveMQClient(MqttConnectionSource source) {
host = source.getHost();
port = source.getPort();
clientId = source.getClientId();
LOGGER.info("Initialization MQTT connection to {} and port {}", host, port);
connect();
}
private void connect() {
try {
client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId != null ? clientId : UUID.randomUUID().toString())
.serverHost(host)
.serverPort(port)
.automaticReconnect()
.initialDelay(RECONNECT_MIN_DELAY, TimeUnit.SECONDS)
.maxDelay(RECONNECT_MAX_DELAY, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.addDisconnectedListener(new MqttClientDisconnectedListener() {
@Override
public void onDisconnected( MqttClientDisconnectedContext context) {
LOGGER.warn("Disconnected");
}
})
.addConnectedListener(new MqttClientConnectedListener() {
@Override
public void onConnected(MqttClientConnectedContext context) {
LOGGER.info("Connected");
}
})
.buildAsync();
client.connectWith().send().whenComplete(new BiConsumer<Mqtt5ConnAck, Throwable>() {
@Override
public void accept(Mqtt5ConnAck t, Throwable u) {
if(u != null) {
LOGGER.error("Error during connection to server: " + u.getMessage(), u);
} else {
LOGGER.info("Connected to {} and port {}", host, port);
}
}
});
} catch (Exception e) {
connect();
}
}
@Override
public void publish(RequestId requestId, String topic, byte[] message, Qos qos, PublisherCallback callback) {
checkClient();
client.publishWith()
.topic(topic)
.payload(message)
.qos(MqttQos.fromCode(qos.getQos()))
.send()
.whenComplete(new BiConsumer<Mqtt5PublishResult, Throwable>() {
@Override
public void accept(Mqtt5PublishResult t, Throwable u) {
if(u != null) {
if(callback != null) {
callback.onError(requestId, u);
}
LOGGER.error("Error sending message " + message + " with id " + requestId + " on topic " + topic + ". Reason: " + u.getMessage());
} else {
if(callback != null) {
callback.onPublished(requestId);
}
LOGGER.info("Message {} with id {} to topic {} sent", message, requestId, topic);
}
}
});
}
private void checkClient() {
if(client == null) {
throw new IllegalArgumentException("MQTT connection is not created.");
}
}
@Override
public void subscribe(RequestId requestId, String topic, Qos qos, MqttSubscriberCallback callback) {
checkClient();
client.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.fromCode(qos.getQos()))
.callback(
new Consumer<Mqtt5Publish>() {
@Override
public void accept(Mqtt5Publish protoMessage) {
callback.onMessage(protoMessage);
}
}
)
.send()
.whenComplete(new BiConsumer<Mqtt5SubAck, Throwable>() {
@Override
public void accept(Mqtt5SubAck t, Throwable u) {
if(u == null) {
callback.onSubscribed(requestId);
LOGGER.info("Topic {} with id {} was successfully subscribed", topic, requestId);
} else {
callback.onError(requestId, u);
LOGGER.error("Error subscribing topic " + topic + " with request id " + requestId + ". Reason: " + u.getMessage(), u);
}
}
});
}
}
Mqtt service:
package my.package;
import java.lang.reflect.InvocationTargetException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import my.package.MqttClient;
import my.package.model.MqttSubscriberCallback;
import my.package.model.ObjectMessagingService;
import my.package.model.PublisherCallback;
import my.package.model.Qos;
import my.package.model.RequestId;
import my.package.model.SubscriberCallback;
import my.package.utils.ProtobufUtils;
class DefaultObjectMessagingService implements ObjectMessagingService {
private final MqttClient client;
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultObjectMessagingService.class);
public DefaultObjectMessagingService(MqttClient client) {
if(client == null) {
throw new IllegalArgumentException("Client cannot be null");
}
this.client = client;
}
@Override
public void publish(RequestId requestId, String topic, Message message, Qos qos, PublisherCallback callback) {
try {
client.publish(requestId, topic, message.toByteArray(), qos, callback);
} catch (IllegalArgumentException | SecurityException e) {
LOGGER.error("Error publishing message with id " + requestId, e);
callback.onError(requestId, e);
}
}
@Override
public void subscribe(RequestId requestId, String topic, Class<? extends Message> messageClass, Qos qos, SubscriberCallback<Message> callback) {
client.subscribe(
requestId,
topic,
qos,
new MqttSubscriberCallback() {
@Override
public void onError(RequestId requestId, Throwable cause) {
callback.onError(requestId, cause);
}
@Override
public void onSubscribed(RequestId requestId) {
callback.onSubscribed(requestId);
}
@Override
public void onMessage(Mqtt5Publish message) {
try {
callback.onMessage(message.getTopic().toString(), ProtobufUtils.convertToProtobuf(messageClass, message.getPayloadAsBytes()));
} catch (InvalidProtocolBufferException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
LOGGER.error("Error during parsing incoming message subscribed for " + topic, e);
callback.onError(requestId, e);
}
}
}
);
}
}
Create MQTT service bean with client from factory. Factory contains only return new HiveMQClient(source);
package my.package.mqtt.configuration;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import my.package.core.mqtt.client.MqttClientFactory;
import my.package.core.service.ObjectMessagingServiceBuilder;
import my.package.model.MqttClientType;
import my.package.model.MqttConnectionSource;
import my.package.model.ObjectMessagingService;
@Configuration
public class MqttMessagingServiceConfiguration {
@Value( "${server.mqtt.url}" )
private String host;
@Value( "${server.mqtt.port}" )
private int port;
@Value( "${server.mqtt.clientid:#{null}}" )
private String clientId;
@Bean
public ObjectMessagingService jsonMessagingService() {
if(clientId == null || clientId.isEmpty()) {
clientId = UUID.randomUUID().toString();
}
return new ObjectMessagingServiceBuilder(
MqttClientFactory.create(
MqttClientType.HIVEMQ,
new MqttConnectionSource(
host,
port,
clientId
)
)
).build();
}
}
Subscribing topics from class Topics holding conllection of all topics. RequestId contains variable long id = ThreadLocalRandom.current().nextLong(1, new Long(Integer.MAX_VALUE));
package my.package.mqtt.configuration;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import my.package.model.Topic;
import my.package.model.Topics;
import my.package.mqtt.subscriber.callback.MessageSubscriberCallback;
import my.package.persistence.DeviceCommunicationRepository;
import my.package.model.ObjectMessagingService;
import my.package.model.RequestId;
@Component
public class MqttSubscribtionInitializer {
@Autowired
private Topics topics;
@Autowired
private DeviceCommunicationRepository deviceCommunicationRepository;
@Autowired
private ObjectMessagingService messagingService;
@PostConstruct
public void init() {
for(Topic topic : topics.getTopics()) {
if(topic.getType() == null) {
continue;
}
RequestId id = new RequestId();
messagingService.subscribe(
id,
topic.getTopic(),
topic.getType().getClazz(),
topic.getQos(),
new MessageSubscriberCallback(topic, deviceCommunicationRepository));
}
}
}
Callback body:
package my.package.mqtt.subscriber.callback;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import my.package.http.model.Metadata;
import my.package.model.CommunicationDirection;
import my.package.model.Topic;
import my.package.persistence.DeviceCommunicationRepository;
import my.package.model.RequestId;
import my.package.model.SubscriberCallback;
public class MessageSubscriberCallback implements SubscriberCallback<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageSubscriberCallback.class);
private final Topic topic;
private final ObjectMessageProducer producer;
private final DeviceCommunicationRepository communicationRepository;
public MessageSubscriberCallback(Topic topic, ObjectMessageProducer producer, DeviceCommunicationRepository communicationRepository) {
this.topic = topic;
this.producer = producer;
this.communicationRepository = communicationRepository;
}
@Override
public void onError(RequestId requestId, Throwable cause) {
LOGGER.error("Error subscribing topic " + topic, cause);
}
@Override
public void onSubscribed(RequestId requestId) {
LOGGER.info("Topic {} was sucessfully subscribed", topic);
}
@Override
public void onMessage(String topic, Message message) {
Pattern p = Pattern.compile("event/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)/([a-zA-Z0-9._-]*)");
Matcher m = p.matcher(topic);
Assert.isTrue(m.find(), "Could not parser topic " + topic);
Assert.isTrue(m.groupCount() == 5, "Invalid topic " + topic);
Metadata metadata = new Metadata(m.group(1), m.group(2), new Integer(m.group(3)));
try {
communicationRepository.store(m.group(4), metadata, m.group(5), messageToJson(message), CommunicationDirection.IN);
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Could not transform message to json", e);
}
}
private String messageToJson(Message message) throws InvalidProtocolBufferException {
return JsonFormat.printer().omittingInsignificantWhitespace().print(message);
}
}
Roman