I’ve recently started using HiveMQ, but I’m wondering if there is a version of the old extension (https://github.com/hivemq/hivemq-database-example-plugin) that supports HiveMQ 4.4.
My idea is to create an extension that will store values coming from my sensors to the database so the frontend can read them from the database and display as a graph.
I will have two topics per sensor.
The topics are like this:
{deviceId}/temperature
{deviceId}/humidity
I’ve tried creating the basic extension (following the tutorial), it worked fine, but now I want to add the data storing but as I wrote I’m relatively new to HiveMQ.
Also another question.
I want to store device status (online/offline) in the database.
What Interceptors I should use? Connect/Disconnect Inbound or Connack/Disconnect Outbound?
Welcome to the HiveMQ Community Forum and thanks for your interest in developing extensions.
Currently there is no database example extension for HiveMQ 4.4, but to achieve your goals you could use a PublishInboundInterceptor for the messages from the devices and the ClientLifecycleEventListener for the device status.
Here an example for the PublishInboundInterceptor:
final PublishInboundInterceptor myInterceptor = new PublishInboundInterceptor() {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (topic.equals(clientId + "/temperature")) {
final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
if(payload.isPresent()){
final String temperature = new String(getBytesFromBuffer(payload.get()));
log.info("Sensor with id '{}' sent a temperature of '{}'", clientId, temperature);
//PUT your DB code for temperature storing here
}
return;
}
if (topic.equals(clientId + "/humidity")) {
final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
if(payload.isPresent()){
final String humidity = new String(getBytesFromBuffer(payload.get()));
log.info("Sensor with id '{}' sent a humidity of '{}'", clientId, humidity);
//PUT your DB code for humidity storing here
}
}
}
@NotNull
private byte[] getBytesFromBuffer(final @NotNull ByteBuffer byteBuffer){
final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
final byte[] array = new byte[rewind.remaining()];
rewind.get(array);
return array;
}
};
Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
@Override
public void initialize(final @NotNull InitializerInput initializerInput, final @NotNull ClientContext clientContext) {
clientContext.addPublishInboundInterceptor(myInterceptor);
}
});
And here an example for a ClientLifecycleEventListener:
final ClientLifecycleEventListener deviceStatusListener = new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
final String clientId = connectionStartInput.getClientInformation().getClientId();
log.info("Sensor with id '{}' connected", clientId);
//PUT your DB code for storing online device status here
}
@Override
public void onAuthenticationSuccessful(final @NotNull AuthenticationSuccessfulInput authenticationSuccessfulInput) {
}
@Override
public void onDisconnect(final @NotNull DisconnectEventInput disconnectEventInput) {
final String clientId = disconnectEventInput.getClientInformation().getClientId();
log.info("Sensor with id '{}' disconnected", clientId);
//PUT your DB code for storing offline device status here
}
};
Services.eventRegistry().setClientLifecycleEventListener(new ClientLifecycleEventListenerProvider() {
@Override
public ClientLifecycleEventListener getClientLifecycleEventListener(final @NotNull ClientLifecycleEventListenerProviderInput input) {
return deviceStatusListener;
}
});
Thank you for a detailed reply.
I’ll try to create the interceptors and will check how the code behaves when stress-testing.
I think the database might be the bottleneck, but the tests will show
I’ve searched for differences between Community and Enterprise Extensions SDK (not sure what events and interceptors I can use), but I can’t find a comparison.
I need to create a security plugin that is using a MongoDB (existing database) to allow users and devices to connect to the broker and publish/subscribe to specific topics.
The device can only publish to his own topics, like {deviceId}/temperature and the user can only subscribe to his device’s topics, for example, {deviceId}/temperature or +/temperature.
Not sure if the second is possible, but ideally when subscribing with + the extension should subscribe the user to all topics he has access to.
I’ve managed to quickly create an extension that stores the data into InfluxDB.
Here is the code for reference:
package com.test;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.interceptor.publish.PublishInboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundInput;
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundOutput;
import com.hivemq.extension.sdk.api.packets.publish.ModifiablePublishPacket;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class MessagePersistorInterceptor implements PublishInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(HelloWorldMain.class);
final String serverURL = "http://127.0.0.1:8086", username = "test", password = "test", databaseName = "mydb";
final InfluxDB influxDB = InfluxDBFactory.connect(serverURL, username, password);
public MessagePersistorInterceptor() {
influxDB.enableBatch(BatchOptions.DEFAULTS);
influxDB.setDatabase(databaseName);
}
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (topic.equals(clientId + "/temperature")) {
final Optional<ByteBuffer> payload = publishInboundInput.getPublishPacket().getPayload();
if (payload.isPresent()) {
final String temperature = new String(getBytesFromBuffer(payload.get()));
log.info("Sensor with id '{}' sent a temperature of '{}'", clientId, temperature);
//PUT your DB code for temperature storing here
try {
influxDB.write(Point.measurement("temperature")
//.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("client_id", clientId)
.addField("temperature", Float.parseFloat(temperature))
.build());
log.info("Data stored in DB");
} catch (Exception e) {
log.error("Error saving data into DB", e);
}
}
return;
}
}
@NotNull
private byte[] getBytesFromBuffer(final @NotNull ByteBuffer byteBuffer) {
final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
final byte[] array = new byte[rewind.remaining()];
rewind.get(array);
return array;
}
}
The code isn’t ideal, but it’s a good start, please notice it’s my first project in Java.
Now I need to add proper connection management and move settings away from code into a config file.
I’ll try to polish things a bit and publish the code to GitHub.
I’m not a Java expert, so good practices are more than welcome
one additional question about auth/autz.
I’ve used Aedes before, and there we can query the database (or external service) during authentication and then store the info we get into the session, so when the user tries to authorize we don’t have to query the database again.
Similar to this: https://auth0.com/docs/integrations/authenticate-devices-using-mqtt
Can the same thing be done in HiveMQ?
When the user tries to authenticate I’d like to query the database and if login/pass are correct then I’d like to store the security-related data info client information, so when he tries to subscribe to a specific topic I won’t have to query the database again.
Or should I use a collection that I’ll write to when the user authenticates and read from when he authorizes?
Probably a Memcache should be used, this way I could store data in the cache and set it’s expiring date.
Something like https://github.com/google/guava/wiki/CachesExplained.
I think you could try the ConnectionAttributeStore of the client and save the info there at authentication, so you could ask the ConnectionAttributeStore again when a subscribe or publish happens.
The ConnectionAttributeStore can be accessed in the input objects of the authorizer / authenticator / interceptor.
For example:
final SubscriptionAuthorizer subAuthor = new SubscriptionAuthorizer() {
@Override
public void authorizeSubscribe(final @NotNull SubscriptionAuthorizerInput input, final @NotNull SubscriptionAuthorizerOutput output) {
final ConnectionAttributeStore attributeStore = input.getConnectionInformation().getConnectionAttributeStore();
}
};
But keep in mind that this info will be lost after the client disconnects, but I think that fits your usecase.
And if you want to store the info on the extension side, you could also use the guava caches. With expiration date if you want. (Could theoretically be accomplished by the ConnectionAttributeStore if you provide a timestamp and compare that every time a subscribe/publish happens)