HiveMQ

Store massages in MsSQL

I’ve recently started using HiveMQ, but I’m wondering if there is a version of the old extension (https://github.com/hivemq/hivemq-database-example-plugin) that supports HiveMQ 4.4.
My idea is to create an extension that will store values coming from my sensors to the database so the frontend can read them from the database and display as a graph.
I will have two topics per sensor.
The topics are like this:
{deviceId}/temperature
{deviceId}/humidity

I’ve tried creating the basic extension (following the tutorial), it worked fine, but now I want to add the data storing but as I wrote I’m relatively new to HiveMQ.

Also another question.
I want to store device status (online/offline) in the database.
What Interceptors I should use? Connect/Disconnect Inbound or Connack/Disconnect Outbound?

Hi Misiu,

Welcome to the HiveMQ Community Forum and thanks for your interest in developing extensions.

Currently there is no database example extension for HiveMQ 4.4, but to achieve your goals you could use a PublishInboundInterceptor for the messages from the devices and the ClientLifecycleEventListener for the device status.

Here an example for the PublishInboundInterceptor:

final PublishInboundInterceptor myInterceptor = new PublishInboundInterceptor() {
            @Override
            public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
                final String clientId = publishInboundInput.getClientInformation().getClientId();
                final String topic = publishInboundInput.getPublishPacket().getTopic();
                if (topic.equals(clientId + "/temperature")) {
                    final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
                    if(payload.isPresent()){
                        final String temperature = new String(getBytesFromBuffer(payload.get()));
                        log.info("Sensor with id '{}' sent a temperature of '{}'", clientId, temperature);
                        //PUT your DB code for temperature storing here
                    }
                    return;
                }
                if (topic.equals(clientId + "/humidity")) {
                    final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
                    if(payload.isPresent()){
                        final String humidity = new String(getBytesFromBuffer(payload.get()));
                        log.info("Sensor with id '{}' sent a humidity of '{}'", clientId, humidity);
                        //PUT your DB code for humidity storing here
                    }
                }
            }

            @NotNull
            private byte[] getBytesFromBuffer(final @NotNull ByteBuffer byteBuffer){
                final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
                final byte[] array = new byte[rewind.remaining()];
                rewind.get(array);
                return array;
            }
        };
        Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
            @Override
            public void initialize(final @NotNull InitializerInput initializerInput, final @NotNull ClientContext clientContext) {
                clientContext.addPublishInboundInterceptor(myInterceptor);
            }
        });

And here an example for a ClientLifecycleEventListener:

final ClientLifecycleEventListener deviceStatusListener = new ClientLifecycleEventListener() {
            @Override
            public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
                final String clientId = connectionStartInput.getClientInformation().getClientId();
                log.info("Sensor with id '{}' connected", clientId);
                //PUT your DB code for storing online device status here
            }

            @Override
            public void onAuthenticationSuccessful(final @NotNull AuthenticationSuccessfulInput authenticationSuccessfulInput) {

            }

            @Override
            public void onDisconnect(final @NotNull DisconnectEventInput disconnectEventInput) {
                final String clientId = disconnectEventInput.getClientInformation().getClientId();
                log.info("Sensor with id '{}' disconnected", clientId);
                //PUT your DB code for storing offline device status here
            }
        };
        Services.eventRegistry().setClientLifecycleEventListener(new ClientLifecycleEventListenerProvider() {
            @Override
            public ClientLifecycleEventListener getClientLifecycleEventListener(final @NotNull ClientLifecycleEventListenerProviderInput input) {
                return deviceStatusListener;
            }
        });

For more information about the interceptors, event registry, and the extension SDK, there is a documentation on our website:
https://www.hivemq.com/docs/hivemq/latest/extensions/interceptors.html#publish-inbound-interceptor
https://www.hivemq.com/docs/hivemq/latest/extensions/registries.html#event-registry

I hope this would help.

Kind regards
Flo

2 Likes

Thank you for a detailed reply.
I’ll try to create the interceptors and will check how the code behaves when stress-testing.
I think the database might be the bottleneck, but the tests will show :slight_smile:



I’ve searched for differences between Community and Enterprise Extensions SDK (not sure what events and interceptors I can use), but I can’t find a comparison.

I need to create a security plugin that is using a MongoDB (existing database) to allow users and devices to connect to the broker and publish/subscribe to specific topics.
The device can only publish to his own topics, like {deviceId}/temperature and the user can only subscribe to his device’s topics, for example, {deviceId}/temperature or +/temperature.

Not sure if the second is possible, but ideally when subscribing with + the extension should subscribe the user to all topics he has access to.

Hi @Misiu,

everything you can find in our documentation can be used by everyone for any edition (CE/ PE/ EE).

Translated to code, you can use everything found in com.hivemq.extension.sdk.api.services.Services:
Bildschirmfoto 2020-08-18 um 12.59.43

For the security extension you can use the securityRegistry() where you can add your auth / autz logic.

Greetings,
Michael from the HiveMQ team

1 Like

the “+/temperature” would be possible if you implement a SubscriptionAuthorizer and register it with the securityRegistry mentioned by @michael_w

one important thing is, that you prevent the subscribe itself, as MQTT will handle this subscribe as a wildcard subscribe for “any/temperature”.

Here is an example how the prevention could be accomplished followed by adding subscriptions where the client has access to:

        ...
        final List<String> topicsIHaveAccessTo = new ArrayList<>();
        topicsIHaveAccessTo.add("myClientId/temperature");
        topicsIHaveAccessTo.add("clientIdOfAFriend/temperature");
        topicsIHaveAccessTo.add("clientIdOfAnotherFriend/temperature");
        
        final SubscriptionAuthorizer subAuthor = new SubscriptionAuthorizer() {
            @Override
            public void authorizeSubscribe(final @NotNull SubscriptionAuthorizerInput input, final @NotNull SubscriptionAuthorizerOutput output) {

                final String topic = input.getSubscription().getTopicFilter();
                final Qos qos = input.getSubscription().getQos();
                final String clientId = input.getClientInformation().getClientId();

                if(topic.startsWith("+/")){
                    //first let the original subscription fail.
                    output.failAuthorization();

                    //then add subscriptions through the store
                    for (final String topicWithAccess : topicsIHaveAccessTo) {
                        final TopicSubscription topicSubscription = Builders.topicSubscription()
                                .topicFilter(topicWithAccess)
                                .qos(qos)
                                .build();
                        Services.subscriptionStore().addSubscription(clientId, topicSubscription);
                    }
                }
            }
        };
        
        Services.securityRegistry().setAuthorizerProvider(new AuthorizerProvider() {
            @Override
            public Authorizer getAuthorizer(final @NotNull AuthorizerProviderInput authorizerProviderInput) {
                return subAuthor;
            }
        });

I hope this could help.

Kind regards,
Flo

I’ve managed to quickly create an extension that stores the data into InfluxDB.
Here is the code for reference:

package com.test;

import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.interceptor.publish.PublishInboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundInput;
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundOutput;
import com.hivemq.extension.sdk.api.packets.publish.ModifiablePublishPacket;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public class MessagePersistorInterceptor implements PublishInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(HelloWorldMain.class);

    final String serverURL = "http://127.0.0.1:8086", username = "test", password = "test", databaseName = "mydb";
    final InfluxDB influxDB = InfluxDBFactory.connect(serverURL, username, password);

    public MessagePersistorInterceptor() {
        influxDB.enableBatch(BatchOptions.DEFAULTS);
        influxDB.setDatabase(databaseName);
    }

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();
        if (topic.equals(clientId + "/temperature")) {
            final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
            if (payload.isPresent()) {
                final String temperature = new String(getBytesFromBuffer(payload.get()));
                log.info("Sensor with id '{}' sent a temperature of '{}'", clientId, temperature);
                //PUT your DB code for temperature storing here

                try {

                    influxDB.write(Point.measurement("temperature")
                            //.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                            .tag("client_id", clientId)
                            .addField("temperature", Float.parseFloat(temperature))
                            .build());

                    log.info("Data stored in DB");
                } catch (Exception e) {
                    log.error("Error saving data into DB", e);
                }
            }
            return;
        }
    }

    @NotNull
    private byte[] getBytesFromBuffer(final @NotNull ByteBuffer byteBuffer) {
        final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
        final byte[] array = new byte[rewind.remaining()];
        rewind.get(array);
        return array;
    }

}

The code isn’t ideal, but it’s a good start, please notice it’s my first project in Java.
Now I need to add proper connection management and move settings away from code into a config file.

I’ll try to polish things a bit and publish the code to GitHub.
I’m not a Java expert, so good practices are more than welcome :slight_smile:

Hi Misiu,

actually it looks fine.

Here some good practices:

You should use one line per field. (easier to read)

final String serverUrl = "http://127.0.0.1:8086";
final String username = "test";
final String password = "test";
final String databaseName = "mydb";

I would suggest to create your InfluxDB in the class where you instanciate the interceptor and provide it to the Interceptors constructor:

new MessagePersistorInterceptor(influxDB);
2 Likes

better use the logger of the class to see in the log which class logged the information:

private static final Logger log = LoggerFactory.getLogger(MessagePersistorInterceptor.class);
1 Like

Flo,

one additional question about auth/autz.
I’ve used Aedes before, and there we can query the database (or external service) during authentication and then store the info we get into the session, so when the user tries to authorize we don’t have to query the database again.
Similar to this: https://auth0.com/docs/integrations/authenticate-devices-using-mqtt

Can the same thing be done in HiveMQ?
When the user tries to authenticate I’d like to query the database and if login/pass are correct then I’d like to store the security-related data info client information, so when he tries to subscribe to a specific topic I won’t have to query the database again.

Or should I use a collection that I’ll write to when the user authenticates and read from when he authorizes?
Probably a Memcache should be used, this way I could store data in the cache and set it’s expiring date.
Something like https://github.com/google/guava/wiki/CachesExplained.

Hi Misiu,

I think you could try the ConnectionAttributeStore of the client and save the info there at authentication, so you could ask the ConnectionAttributeStore again when a subscribe or publish happens.

The ConnectionAttributeStore can be accessed in the input objects of the authorizer / authenticator / interceptor.

For example:

 final SubscriptionAuthorizer subAuthor = new SubscriptionAuthorizer() {
        @Override
        public void authorizeSubscribe(final @NotNull SubscriptionAuthorizerInput input, final @NotNull SubscriptionAuthorizerOutput output) {
            final ConnectionAttributeStore attributeStore = input.getConnectionInformation().getConnectionAttributeStore();
        }
    };

But keep in mind that this info will be lost after the client disconnects, but I think that fits your usecase.

And if you want to store the info on the extension side, you could also use the guava caches. With expiration date if you want. (Could theoretically be accomplished by the ConnectionAttributeStore if you provide a timestamp and compare that every time a subscribe/publish happens)

1 Like