Issues with Publish from Android: Repeated Packets and Intermittent Publishing Stops Using HiveMQ Client

I am developing an Android application that publishes ECG packets using the Java HiveMQ MQTT Client library to a HiveMQ broker. The application works as follows:

The user enters a userId(topic), and upon clicking a Start Publish button, a scheduled task generates data packets every 900 ms and adds them to a queue. After adding a packet to the queue, the task calls the publishAllQueuedPackets method, which publishes the queued packets with QoS 1 via the Mqtt5AsyncClient. Finally, the queue is cleared after publishing.

However, I am facing two recurring issues:

  1. Repeated Publishing of Previously Sent Packets:
    Some previously sent queued packets seem to be resent multiple times (20–40 times). I suspect this could be due to either mishandling of the queue (e.g., the previous publish operation takes time, causing the queued messages not to be cleared and subsequently being considered in the next publishing cycle, effectively publishing some packets twice) or an asynchronous issue with the publishAllQueuedPackets method. This issue is primarily observed when the client disconnects and reconnects.
  2. Intermittent Stopping of Publishing Despite Stable Connectivity:
    Occasionally, even when the client is connected to a stable internet connection, the publishing process pauses and then resumes randomly after some time.

Note: My implementation for publishing packets may not follow best practices, as I am a beginner in MQTT and Android development. I am open to any suggestions for improvement. Also, please note that in-order publishing of packets to the broker is important.

Below is my full code for reference:

package com.example.hivemqpublisher;

import android.annotation.SuppressLint;
import android.os.Bundle;
import android.text.TextUtils;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "HiveMQPublisher";
  private static final double[] DATA = {0.0, -0.004, /* ... 2500 data points ... */};


    private static final int PACKET_SIZE = 125;
    private static final long GENERATE_INTERVAL = 900L; // 900ms

    private Mqtt5AsyncClient mqttClient;
    private ScheduledExecutorService scheduler;
    private int packetNumber = 0;

    private Queue<JSONObject> packetQueue = new ConcurrentLinkedQueue<>();

    private EditText userIdEditText;
    private Button startButton, stopButton;
    private TextView clientStatusTextView, lastPublishedTextView, queueEndTextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        userIdEditText = findViewById(R.id.userIdEditText);
        startButton = findViewById(R.id.startPublishingButton);
        stopButton = findViewById(R.id.stopPublishingButton);
        clientStatusTextView = findViewById(R.id.clientStatusTextView);
        lastPublishedTextView = findViewById(R.id.lastPublishedPacketTextView);
        queueEndTextView = findViewById(R.id.queueEndPacketTextView);

        startButton.setOnClickListener(v -> startPublishing());
        stopButton.setOnClickListener(v -> stopPublishing());

        Log.d(TAG, "onCreate: UI initialized.");
    }

    @SuppressLint("SetTextI18n")
    private void startPublishing() {
        Log.d(TAG, "startPublishing: Attempting to start publishing.");
        String userId = userIdEditText.getText().toString().trim();
        if (TextUtils.isEmpty(userId)) {
            clientStatusTextView.setText("User ID is required!");
            Log.e(TAG, "startPublishing: User ID is empty.");
            return;
        }

        userIdEditText.setEnabled(false);
        String topic = "hivemq/ff/" + userId;

        // Create MQTT Client
        Log.d(TAG, "startPublishing: Creating MQTT client.");
        mqttClient = Mqtt5Client.builder()
                .serverHost("broker.hivemq.com")
                .identifier("android_ecg_publisher_" + userId)
                .serverPort(8883)
                .sslWithDefaultConfig()
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS)
                .maxDelay(10, TimeUnit.SECONDS)
                .applyAutomaticReconnect()
                .buildAsync();

        // Connect to Broker
        mqttClient.connectWith()
                .cleanStart(true)
                .keepAlive(1)
                .sessionExpiryInterval(300)
                .send()
                .thenAccept(connAck -> {
                    Log.i(TAG, "Connected to MQTT Broker successfully.");
                    runOnUiThread(() -> clientStatusTextView.setText("Connected to MQTT Broker"));
                    startScheduler(topic);
                })
                .exceptionally(throwable -> {
                    Log.e(TAG, "Failed to connect to MQTT Broker: " + throwable.getMessage(), throwable);
                    runOnUiThread(() -> clientStatusTextView.setText("Failed to connect: " + throwable.getMessage()));
                    return null;
                });

        startButton.setVisibility(View.GONE);
        stopButton.setVisibility(View.VISIBLE);
    }

    @SuppressLint("SetTextI18n")
    private void stopPublishing() {
        Log.d(TAG, "stopPublishing: Attempting to stop publishing.");
        if (scheduler != null) {
            scheduler.shutdownNow();
            Log.d(TAG, "stopPublishing: Scheduler shut down.");
        }

        packetQueue.clear(); // Clear the queue to reset
        Log.d(TAG, "stopPublishing: Packet queue cleared.");

        if (mqttClient != null && mqttClient.getState().isConnected()) {
            mqttClient.disconnect()
                    .thenRun(() -> Log.i(TAG, "MQTT Client disconnected"))
                    .exceptionally(throwable -> {
                        Log.e(TAG, "Failed to disconnect MQTT client: " + throwable.getMessage(), throwable);
                        return null;
                    });
        }

        packetNumber = 1; // Reset the packet number
        runOnUiThread(() -> {
            startButton.setVisibility(View.VISIBLE);
            stopButton.setVisibility(View.GONE);
            clientStatusTextView.setText("Publishing stopped");
            lastPublishedTextView.setText("Last Published Packet No: 0");
            queueEndTextView.setText("Queue End Packet No: 0");
        });
    }

    private void startScheduler(String topic) {
        Log.d(TAG, "startScheduler: Starting scheduler for packet generation.");
        scheduler = Executors.newScheduledThreadPool(1);

        scheduler.scheduleWithFixedDelay(() -> generatePacket(topic), 0, GENERATE_INTERVAL, TimeUnit.MILLISECONDS);
    }

    private void generatePacket(String topic) {
        try {
            Log.d(TAG, "generatePacket: Preparing data for packetNumber: " + packetNumber);
            int startIdx = (packetNumber * PACKET_SIZE) % DATA.length;

            // Prepare Data Packet
            JSONArray jsonArray = new JSONArray();
            for (int i = startIdx; i < startIdx + PACKET_SIZE; i++) {
                jsonArray.put(DATA[i % DATA.length]);
            }

            JSONObject packetJson = new JSONObject();
            packetJson.put("type", "livecg-data");
            packetJson.put("data", jsonArray);
            packetJson.put("packetNo", packetNumber);
            packetJson.put("Timestamp", System.currentTimeMillis());

            packetQueue.add(packetJson);
            Log.d(TAG, "generatePacket: Packet added to queue: " + packetNumber);
            packetNumber++;

            runOnUiThread(() -> queueEndTextView.setText("Queue End Packet No: " + (packetNumber - 1)));
            publishAllQueuedPackets(topic);

        } catch (Exception e) {
            Log.e(TAG, "generatePacket: Error preparing packet: " + e.getMessage(), e);
        }
    }


    private void publishAllQueuedPackets(String topic) {
        if (packetQueue.isEmpty()) {
            return; // Nothing to publish
        }

        JSONArray batch = new JSONArray();
        AtomicInteger lastPacketNumber = new AtomicInteger(-1); // Atomic wrapper for lambda compatibility

        // Build the batch and keep track of the last packet number
        for (JSONObject packet : packetQueue) {
            batch.put(packet);
            lastPacketNumber.set(packet.optInt("packetNo", -1)); // Update the atomic variable
        }

        mqttClient.publishWith()
                .topic(topic)
                .qos(MqttQos.AT_LEAST_ONCE)
                .payload(batch.toString().getBytes())
                .send()
                .thenAccept(publishResult -> {
                    int finalPacketNumber = lastPacketNumber.get(); // Get the last packet number

                    Log.i(TAG, "publishAllQueuedPackets: Published batch successfully. Last packet: " + finalPacketNumber);

                    // Update UI with the last published packet
                    runOnUiThread(() -> {
                        lastPublishedTextView.setText("Last Published Packet No: " + finalPacketNumber);
                    });

                    // Clear the queue after successful publish
                    packetQueue.clear();
                })
                .exceptionally(throwable -> {
                    Log.e(TAG, "publishAllQueuedPackets: Failed to publish batch: " + throwable.getMessage(), throwable);
                    // Retain the queue for retrying later
                    return null;
                });
    }
}

Hello @qwert477 ,

Thank you for the outreach - we would be happy to assist!

I’ll provide a note for each of the two issues you’ve mentioned :

  1. Repeated Publishing of Previously Sent Packets:
  • This behavior is certainly curious, and there may be a number of different locations that this behavior is stemming from. To start with, as QoS 1 and 2 messages do have a guarantee of delivery to the relevant subscribers, these may be republished by the broker to ensure that their delivery was successful. In the event of validation of delivery having been successful for these messages, additional logic to pend a PUBACK/PUBCOMP may be relevant here. It may additionally be worth monitoring how many publishes are being sent at a single time, as exceeding the available broker resources for instantaneous publish rates may prevent publishes from being successful - utilizing message logging or reviewing the HiveMQ logs for any errors or message details may be beneficial here as well!
  1. Intermittent Stopping of Publishing Despite Stable Connectivity:
  • This may be related to the instantaneous message rate mentioned above. As an example, if cluster overload protection has been engaged, this can cause inbound messages to be blocked for a time to allow for the cluster to process its current workload. I would also recommend reviewing or attaching the HiveMQ broker logs here to verify if this is the case, and confirming if any failure responses to publish requests are being received by the publishing client.

Let us know if this clarifies, or if you’d like, please feel free to attach these logs and we will be happy to investigate further!

Best,
Aaron from the HiveMQ Team