Publishing to mutiple topics with PublishInboundInterceptor

Here’s a simple scenario that I’d like to implement via an interceptor:

Publish comes in on topic “A” with some payload

I’d like to split up the payload, then publish (with the retain flag) to topics “X”, “Y”, and “Z”

Thanks in advance for the help!

Rick

Note that I’ll also just pass the original publish packet along unmodified.

Hi @RickBullotta,

here a simple example. I hope it helps you to get started for your solution. If you still have question feel free to ask.

import com.hivemq.extension.sdk.api.ExtensionMain;
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartOutput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopOutput;
import com.hivemq.extension.sdk.api.services.Services;
import com.hivemq.extension.sdk.api.services.builder.Builders;
import com.hivemq.extension.sdk.api.services.intializer.ClientInitializer;
import com.hivemq.extension.sdk.api.services.publish.Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;

public class TestExtensionMainPublishInboundInterceptor implements ExtensionMain {

    private static final Logger logger = LoggerFactory.getLogger(TestExtensionMainPublishInboundInterceptor.class);

    @Override
    public void extensionStart(final ExtensionStartInput input, final ExtensionStartOutput output) {

        final ClientInitializer clientInitializer = (initializerInput, clientContext) -> {
            clientContext.addPublishInboundInterceptor((publishInboundInput, publishInboundOutput) -> {

                final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
                if (!"A".equals(publishPacket.getTopic())) {
                    return; // we ignore when incorrect topic
                }

                if (publishPacket.getPayload().isEmpty()) {
                    return; // we skip it because no payload to create new payloads
                }

                // split how you need the payload
                final ByteBuffer buffer = publishPacket.getPayload().get();


                // create the publish messages how you need them 
                final Publish publishTopicX = Builders.publish().topic("X").retain(publishPacket.getRetain()).build();
                final Publish publishTopicY = Builders.publish().topic("Y").retain(publishPacket.getRetain()).build();
                final Publish publishTopicZ = Builders.publish().topic("Z").retain(publishPacket.getRetain()).build();

                // now publish the messages asynchronously
                Services.extensionExecutorService().submit(() -> {
                    Services.publishService().publish(publishTopicX).whenComplete((unused, throwable) -> {
                        if (throwable != null) {
                            logger.info("ouch topic x,", throwable.getCause());
                        }
                    });
                    Services.publishService().publish(publishTopicY).whenComplete((unused, throwable) -> {
                        if (throwable != null) {
                            logger.info("ouch topic y,", throwable.getCause());
                        }
                    });
                    Services.publishService().publish(publishTopicZ).whenComplete((unused, throwable) -> {
                        if (throwable != null) {
                            logger.info("ouch topic z,", throwable.getCause());
                        }
                    });
                });
                
                // the old message is still normally sent out
            });
        };

        Services.initializerRegistry().setClientInitializer(clientInitializer);
    }

    @Override
    public void extensionStop(final ExtensionStopInput input, final ExtensionStopOutput output) {
    }

}

Greetings,
Michael

Perfect! Thank you Michael - getting a reference to the publishService() was the missing piece of the puzzle.

I wanted to let you know that I’ve completed my HiveMQ extension (it expands Sparkplug B metrics into “normal” MQTT topics and persists both data and metadata). It’s working perfectly.

Thanks again for the help!

1 Like