Here’s a simple scenario that I’d like to implement via an interceptor:
Publish comes in on topic “A” with some payload
I’d like to split up the payload, then publish (with the retain flag) to topics “X”, “Y”, and “Z”
Thanks in advance for the help!
Rick
Note that I’ll also just pass the original publish packet along unmodified.
Hi @RickBullotta,
here a simple example. I hope it helps you to get started for your solution. If you still have question feel free to ask.
import com.hivemq.extension.sdk.api.ExtensionMain;
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartOutput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopOutput;
import com.hivemq.extension.sdk.api.services.Services;
import com.hivemq.extension.sdk.api.services.builder.Builders;
import com.hivemq.extension.sdk.api.services.intializer.ClientInitializer;
import com.hivemq.extension.sdk.api.services.publish.Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
public class TestExtensionMainPublishInboundInterceptor implements ExtensionMain {
private static final Logger logger = LoggerFactory.getLogger(TestExtensionMainPublishInboundInterceptor.class);
@Override
public void extensionStart(final ExtensionStartInput input, final ExtensionStartOutput output) {
final ClientInitializer clientInitializer = (initializerInput, clientContext) -> {
clientContext.addPublishInboundInterceptor((publishInboundInput, publishInboundOutput) -> {
final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
if (!"A".equals(publishPacket.getTopic())) {
return; // we ignore when incorrect topic
}
if (publishPacket.getPayload().isEmpty()) {
return; // we skip it because no payload to create new payloads
}
// split how you need the payload
final ByteBuffer buffer = publishPacket.getPayload().get();
// create the publish messages how you need them
final Publish publishTopicX = Builders.publish().topic("X").retain(publishPacket.getRetain()).build();
final Publish publishTopicY = Builders.publish().topic("Y").retain(publishPacket.getRetain()).build();
final Publish publishTopicZ = Builders.publish().topic("Z").retain(publishPacket.getRetain()).build();
// now publish the messages asynchronously
Services.extensionExecutorService().submit(() -> {
Services.publishService().publish(publishTopicX).whenComplete((unused, throwable) -> {
if (throwable != null) {
logger.info("ouch topic x,", throwable.getCause());
}
});
Services.publishService().publish(publishTopicY).whenComplete((unused, throwable) -> {
if (throwable != null) {
logger.info("ouch topic y,", throwable.getCause());
}
});
Services.publishService().publish(publishTopicZ).whenComplete((unused, throwable) -> {
if (throwable != null) {
logger.info("ouch topic z,", throwable.getCause());
}
});
});
// the old message is still normally sent out
});
};
Services.initializerRegistry().setClientInitializer(clientInitializer);
}
@Override
public void extensionStop(final ExtensionStopInput input, final ExtensionStopOutput output) {
}
}
Greetings,
Michael
Perfect! Thank you Michael - getting a reference to the publishService() was the missing piece of the puzzle.
I wanted to let you know that I’ve completed my HiveMQ extension (it expands Sparkplug B metrics into “normal” MQTT topics and persists both data and metadata). It’s working perfectly.
Thanks again for the help!
1 Like