I am developing an Android application that publishes ECG packets using the Java HiveMQ MQTT Client library to a HiveMQ broker. The application works as follows:
The user enters a userId(topic), and upon clicking a Start Publish button, a scheduled task generates data packets every 900 ms and adds them to a queue. After adding a packet to the queue, the task calls the publishAllQueuedPackets
method, which publishes the queued packets with QoS 1 via the Mqtt5AsyncClient
. Finally, the queue is cleared after publishing.
However, I am facing two recurring issues:
- Repeated Publishing of Previously Sent Packets:
Some previously sent queued packets seem to be resent multiple times (20–40 times). I suspect this could be due to either mishandling of the queue (e.g., the previous publish operation takes time, causing the queued messages not to be cleared and subsequently being considered in the next publishing cycle, effectively publishing some packets twice) or an asynchronous issue with thepublishAllQueuedPackets
method. This issue is primarily observed when the client disconnects and reconnects. - Intermittent Stopping of Publishing Despite Stable Connectivity:
Occasionally, even when the client is connected to a stable internet connection, the publishing process pauses and then resumes randomly after some time.
Note: My implementation for publishing packets may not follow best practices, as I am a beginner in MQTT and Android development. I am open to any suggestions for improvement. Also, please note that in-order publishing of packets to the broker is important.
Below is my full code for reference:
package com.example.hivemqpublisher;
import android.annotation.SuppressLint;
import android.os.Bundle;
import android.text.TextUtils;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "HiveMQPublisher";
private static final double[] DATA = {0.0, -0.004, /* ... 2500 data points ... */};
private static final int PACKET_SIZE = 125;
private static final long GENERATE_INTERVAL = 900L; // 900ms
private Mqtt5AsyncClient mqttClient;
private ScheduledExecutorService scheduler;
private int packetNumber = 0;
private Queue<JSONObject> packetQueue = new ConcurrentLinkedQueue<>();
private EditText userIdEditText;
private Button startButton, stopButton;
private TextView clientStatusTextView, lastPublishedTextView, queueEndTextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
userIdEditText = findViewById(R.id.userIdEditText);
startButton = findViewById(R.id.startPublishingButton);
stopButton = findViewById(R.id.stopPublishingButton);
clientStatusTextView = findViewById(R.id.clientStatusTextView);
lastPublishedTextView = findViewById(R.id.lastPublishedPacketTextView);
queueEndTextView = findViewById(R.id.queueEndPacketTextView);
startButton.setOnClickListener(v -> startPublishing());
stopButton.setOnClickListener(v -> stopPublishing());
Log.d(TAG, "onCreate: UI initialized.");
}
@SuppressLint("SetTextI18n")
private void startPublishing() {
Log.d(TAG, "startPublishing: Attempting to start publishing.");
String userId = userIdEditText.getText().toString().trim();
if (TextUtils.isEmpty(userId)) {
clientStatusTextView.setText("User ID is required!");
Log.e(TAG, "startPublishing: User ID is empty.");
return;
}
userIdEditText.setEnabled(false);
String topic = "hivemq/ff/" + userId;
// Create MQTT Client
Log.d(TAG, "startPublishing: Creating MQTT client.");
mqttClient = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.identifier("android_ecg_publisher_" + userId)
.serverPort(8883)
.sslWithDefaultConfig()
.automaticReconnect()
.initialDelay(1, TimeUnit.SECONDS)
.maxDelay(10, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.buildAsync();
// Connect to Broker
mqttClient.connectWith()
.cleanStart(true)
.keepAlive(1)
.sessionExpiryInterval(300)
.send()
.thenAccept(connAck -> {
Log.i(TAG, "Connected to MQTT Broker successfully.");
runOnUiThread(() -> clientStatusTextView.setText("Connected to MQTT Broker"));
startScheduler(topic);
})
.exceptionally(throwable -> {
Log.e(TAG, "Failed to connect to MQTT Broker: " + throwable.getMessage(), throwable);
runOnUiThread(() -> clientStatusTextView.setText("Failed to connect: " + throwable.getMessage()));
return null;
});
startButton.setVisibility(View.GONE);
stopButton.setVisibility(View.VISIBLE);
}
@SuppressLint("SetTextI18n")
private void stopPublishing() {
Log.d(TAG, "stopPublishing: Attempting to stop publishing.");
if (scheduler != null) {
scheduler.shutdownNow();
Log.d(TAG, "stopPublishing: Scheduler shut down.");
}
packetQueue.clear(); // Clear the queue to reset
Log.d(TAG, "stopPublishing: Packet queue cleared.");
if (mqttClient != null && mqttClient.getState().isConnected()) {
mqttClient.disconnect()
.thenRun(() -> Log.i(TAG, "MQTT Client disconnected"))
.exceptionally(throwable -> {
Log.e(TAG, "Failed to disconnect MQTT client: " + throwable.getMessage(), throwable);
return null;
});
}
packetNumber = 1; // Reset the packet number
runOnUiThread(() -> {
startButton.setVisibility(View.VISIBLE);
stopButton.setVisibility(View.GONE);
clientStatusTextView.setText("Publishing stopped");
lastPublishedTextView.setText("Last Published Packet No: 0");
queueEndTextView.setText("Queue End Packet No: 0");
});
}
private void startScheduler(String topic) {
Log.d(TAG, "startScheduler: Starting scheduler for packet generation.");
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleWithFixedDelay(() -> generatePacket(topic), 0, GENERATE_INTERVAL, TimeUnit.MILLISECONDS);
}
private void generatePacket(String topic) {
try {
Log.d(TAG, "generatePacket: Preparing data for packetNumber: " + packetNumber);
int startIdx = (packetNumber * PACKET_SIZE) % DATA.length;
// Prepare Data Packet
JSONArray jsonArray = new JSONArray();
for (int i = startIdx; i < startIdx + PACKET_SIZE; i++) {
jsonArray.put(DATA[i % DATA.length]);
}
JSONObject packetJson = new JSONObject();
packetJson.put("type", "livecg-data");
packetJson.put("data", jsonArray);
packetJson.put("packetNo", packetNumber);
packetJson.put("Timestamp", System.currentTimeMillis());
packetQueue.add(packetJson);
Log.d(TAG, "generatePacket: Packet added to queue: " + packetNumber);
packetNumber++;
runOnUiThread(() -> queueEndTextView.setText("Queue End Packet No: " + (packetNumber - 1)));
publishAllQueuedPackets(topic);
} catch (Exception e) {
Log.e(TAG, "generatePacket: Error preparing packet: " + e.getMessage(), e);
}
}
private void publishAllQueuedPackets(String topic) {
if (packetQueue.isEmpty()) {
return; // Nothing to publish
}
JSONArray batch = new JSONArray();
AtomicInteger lastPacketNumber = new AtomicInteger(-1); // Atomic wrapper for lambda compatibility
// Build the batch and keep track of the last packet number
for (JSONObject packet : packetQueue) {
batch.put(packet);
lastPacketNumber.set(packet.optInt("packetNo", -1)); // Update the atomic variable
}
mqttClient.publishWith()
.topic(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.payload(batch.toString().getBytes())
.send()
.thenAccept(publishResult -> {
int finalPacketNumber = lastPacketNumber.get(); // Get the last packet number
Log.i(TAG, "publishAllQueuedPackets: Published batch successfully. Last packet: " + finalPacketNumber);
// Update UI with the last published packet
runOnUiThread(() -> {
lastPublishedTextView.setText("Last Published Packet No: " + finalPacketNumber);
});
// Clear the queue after successful publish
packetQueue.clear();
})
.exceptionally(throwable -> {
Log.e(TAG, "publishAllQueuedPackets: Failed to publish batch: " + throwable.getMessage(), throwable);
// Retain the queue for retrying later
return null;
});
}
}