Cannot consume dropped messages

I am not able to consume the records from dropped topic and I have enabled the configuration in the config.xml file.

Can let me know how to consume them?
Also, can we get the dropped messages from the dropped messages topic to Kafka using Kafka-HiveMQ-extension and see that data in Kafka topic?

I have attached my config file for reference and currently I am using HiveMQ version 4.8.1
I have also attached the screenshot of MQTT fx that I used to subscribe and consume data.

<?xml version="1.0"?>
<hivemq>

    <listeners>
        <tcp-listener>
            <port>1883</port>
            <bind-address>0.0.0.0</bind-address>
        </tcp-listener>
    </listeners>
    

    <mqtt-addons>
        <dropped-messages-topic>
            <enabled>true</enabled>
        </dropped-messages-topic>
    </mqtt-addons>
    
 
    <anonymous-usage-statistics>
        <enabled>true</enabled>
    </anonymous-usage-statistics>
        <mqtt>

        <queued-messages>
            <max-queue-size>1</max-queue-size>
            <strategy>discard</strategy>
        </queued-messages>

        <topic-alias>
            <enabled>true</enabled>
            <max-per-client>5</max-per-client>
        </topic-alias>

        <message-expiry>
            <max-interval>4294967296</max-interval>  <!-- this value means no message expiry -->
        </message-expiry>

        <session-expiry>
            <max-interval>4294967295</max-interval> <!-- ~ 130 years -->
        </session-expiry>

        <keep-alive>
            <allow-unlimited>true</allow-unlimited>
            <max-keep-alive>65535</max-keep-alive>
        </keep-alive>

        <packets>
            <max-packet-size>268435460</max-packet-size> <!-- 256 MB -->
        </packets>

        <receive-maximum>
            <server-receive-maximum>10</server-receive-maximum>
        </receive-maximum>

        <quality-of-service>
            <max-qos>2</max-qos>
        </quality-of-service>

        <wildcard-subscriptions>
            <enabled>true</enabled>
        </wildcard-subscriptions>

        <shared-subscriptions>
            <enabled>true</enabled>
        </shared-subscriptions>

        <subscription-identifier>
            <enabled>true</enabled>
        </subscription-identifier>

        <retained-messages>
            <enabled>true</enabled>
        </retained-messages>

    </mqtt>

</hivemq>

Regards,
Nirmal

Hi @Nirmal ,

  • How do you simulate the scenario where a message is dropped (how do you “generate” dropped messages)?
  • Do you see in the HiveMQ Control Center that you have messages dropped?
  • Do you see from the HiveMQ logs that you have messages dropped?

Thanks,
Dasha

Hi @Daria_H ,
** How do you simulate the scenario where a message is dropped (how do you “generate” dropped messages)?**
I have decreased the incoming message size in kafka and sent large volume of data from MQTT fx to hiveMQ

** Do you see in the HiveMQ Control Center that you have messages dropped?**
Yes, I can see that the message has been dropped in the Control center and got the notification for it.

Do you see from the HiveMQ logs that you have messages dropped?
Yes I can see a WARN log stating that the messages has been dropped.

Regards,
Nirmal Srinivasan

Team Can anyone respond back soon cause I have a major implementation to be done with this.

Regards,
Nirmal

@Daria_H @michael_w Can anyone please look into this give me an update on this issue?

Hi @Nirmal,

so I tried it with your config and my client that subscribed to $dropped/# got the dropped messages. I produced the dropped messages by having an offline client that subscribes to topic "test:

mqtt sub -t test -q 1 -se 100000 -i sleepyClient

Note: disconnect client so he is offline.

So now I add subscriber for dropped topic:

mqtt sub -t '$dropped/#'

Then I publish three messages:

mqtt pub -t test -q 1 -m "oh no"
mqtt pub -t test -q 1 -m "oh no2"
mqtt pub -t test -q 1 -m "oh no3"

And my dropped subscriber gets “oh no2” and “oh no3”, as “oh no” is the one message still queued for the offline client.

I’m still unclear though how you produce dropped messages, I would love to see the Control Center
notification and the WARN message.

Regarding:

Also, can we get the dropped messages from the dropped messages topic to Kafka using Kafka-HiveMQ-extension and see that data in Kafka topic?

The answer was already given here: Reprocessing the dropped messages using Kafka Extention

Greetings,
Michael

Hi @michael_w,

Thanks for the response and it worked for me the way that you have mentioned. After I have tried the ways you have given me, I have few more doubts.
Whether the dropped messages will sent to dropped topic only if the messages are dropped for the below reasons alone?
image

And if so, how can I configure for messages dropped from Kafka side so that will also be sent to dropped topic ?

I got the following notification in HiveMQ control center if a message was dropped from Kafka Side:
Kafka HiveMQ notification

Regards,
Nirmal

Hi @Nirmal,

TLDR: What you want to archive is currently not possible.

The dropped topic only works for MQTT messages and not for Kafka records.
That is the reason why you see the notification for dropped Kafka records but don’t get an MQTT message to dropped topic. There is just no way for a consumer in an extension to let HiveMQ know that MQTT message x can be considered dropped.

Greetings,
Michael

1 Like

@michael_w , Thanks for the Clarification. Can you guide me through on how we can recreate the following issues in the local cluster:
image

And also, what are all the kind of metrices should be considered for monitoring HiveMQ cluster and broker and what kind of major errors will occur in Production?

Can you guide me through on how we can recreate the following issues in the local cluster:

For this I redirect you to our docs: Control Center Analytics :: HiveMQ Documentation
See section “Detailed explanation of the reasons”

And also, what are all the kind of metrices should be considered for monitoring HiveMQ cluster

For monitoring you can either look for this one metric: com.hivemq.messages.dropped.count
This one increases each time, when for whatever reason, an MQTT message is dropped

There are also metrics for each specific reason you can find them here: Monitoring :: HiveMQ Documentation
They have the prefix “com.hivemq.messages.dropped.”

what kind of major errors will occur in Production

Not sure I understand the question but I try to answer it:

From experience the queue full metric will occur the most, this could indicate that you have slow consuming subscribers or your expiry for publishes is to big therefore have a lot of old messages in the system.

Greetings,
Michael

1 Like

Thank you for the great Response @michael_w. I will reach back again if I have any other doubts and issues related HiveMQ.

Regards,
Nirmal

Hi @michael_w ,
I referred this documentation Control Center Analytics :: HiveMQ Documentation. I am able to see why these issues are created but not able to re-create these issues back again.

Can you guide me through on recreating these issue like the way you informed for “Client Message Queue Full” that you guided me in the previous loop.

Regards,
Nirmal

My question is that, what sort of errors will occur in production and what kind of errors will occur frequently in production that has to be monitored for ?

Regards,
Nirmal

I am able to see why these issues are created but not able to re-create these issues back again.

Ah now I get it. I’ll only explain those that are easy to reproduce:

Client queue full:

  • have an offline subscriber client (with qos > 0) and publish more publishes (with qos > 0) as is the configured queue size, basically the example I gave you above. Of course it’s faster to reproduce with a small queue max size:
<queued-messages>
     <max-queue-size>1</max-queue-size>
     <strategy>discard</strategy>
</queued-messages>
  • Another way is to simulate a subscriber that is slow in acking messages or doesn’t ack for PUBLISH messages at all (need qos > 0)

Extension prevented:

  • Use for example the PublishInboundInterceptor (or PublishOutboundInterceptor) and call for example:
    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
        publishInboundOutput.preventPublishDelivery();
    }

Maximum packet size exceeded:

  • Well sent a publish packet that is over the configured packet limit :man_shrugging: use an MQTT 3 client as most MQTT 5 clients prevent the sending of packets of server limit (as is mandatory per MQTT 5 specification)

My question is that, what sort of errors will occur in production and what kind of errors will occur frequently in production that has to be monitored for ?

As this is basically another question please open a new thread for it.