TopicPermission for topic containing +

Let’s say I have two users: u1, and u2.
Each user owns a couple of devices. u1: d1, d2,d3, u2: d4, d5, d6.

In my custom Authentication (that implements SimpleAuthenticator) in onConnect I have this code:

final ArrayList<TopicPermission> topicPermissions = new ArrayList<>();
for (String deviceId: user.get().getDevices()) {

    final TopicPermission subscribePermission = Builders.topicPermission()
            .topicFilter(deviceId+"/values")
            .activity(TopicPermission.MqttActivity.SUBSCRIBE)
            .type(TopicPermission.PermissionType.ALLOW)
            .build();
    topicPermissions.add(subscribePermission);
}

output.getDefaultPermissions().addAll(topicPermissions);
output.getDefaultPermissions().setDefaultBehaviour(DefaultAuthorizationBehaviour.DENY);

output.authenticateSuccessfully();

this creates TopicFilters, so a user can connect only to devices he owns.

But if I want to create a single subscription to listen to all messages (+/values) I can’t do that with the above code.

Is it possible to create such a filter so the user will be able to connect to +/values but only get messages from devices the user owns?

so if all devices will send messages u1 will get only messages from d1,d2, and d3.

I want to create such a subscription to use it with WebSockets to be able to update values on the dashboard.

Hi @Misiu,

this is not possible out of the box, as you would also get messages from d4, d5 and d6.

So correct me if I understand that wrong, but you want that the client only needs to subscribe to +/values but only gets the messages from devices he owns?

POC I tried out:

public class SubsetOfWildcardSubscriptionExtensionMain implements ExtensionMain {

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
        Services.securityRegistry().setAuthenticatorProvider((i) -> (SimpleAuthenticator) (simpleAuthInput, simpleAuthOutput) -> {

            final String clientId = simpleAuthInput.getClientInformation().getClientId();

            if (isUser(clientId)) {
                final TopicPermission userSubscribePermission = Builders.topicPermission()
                        .topicFilter("+/values")
                        .activity(TopicPermission.MqttActivity.ALL) // TODO revert back to subscribe; currently ALL so subscriber can simulate device publish
                        .type(TopicPermission.PermissionType.ALLOW)
                        .build();

                simpleAuthOutput.getDefaultPermissions().add(userSubscribePermission);
                simpleAuthOutput.authenticateSuccessfully();
                return;

            } else if (isDevice(clientId)) {
                final TopicPermission devicePublishPermission = Builders.topicPermission()
                        .topicFilter("+/values")
                        .activity(TopicPermission.MqttActivity.PUBLISH)
                        .type(TopicPermission.PermissionType.ALLOW)
                        .build();

                simpleAuthOutput.getDefaultPermissions().add(devicePublishPermission);
                simpleAuthOutput.authenticateSuccessfully();
                return;
            }

            simpleAuthOutput.failAuthentication();
        });

        Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {

            if (isUser(initializerInput.getClientInformation().getClientId())) {

                clientContext.addSubscribeInboundInterceptor((subscribeInboundInput, subscribeInboundOutput) -> {
                    final String clientId = subscribeInboundInput.getClientInformation().getClientId();

                    final Set<String> devicesForUser = fetchDevicesForUser(clientId);
                    final Set<TopicSubscription> additionalSubscriptions = new HashSet<>();

                    boolean replacedSubscription = false;

                    for (final String device : devicesForUser) {
                        if (!replacedSubscription) { // we need to change +/values to a device topic of the user
                            final List<ModifiableSubscription> subscriptions = subscribeInboundOutput.getSubscribePacket().getSubscriptions();
                            subscriptions.get(0).setTopicFilter(device + "/values");
                            replacedSubscription = true;
                            continue;
                        }

                        // the remaining devices will be added via Subscription Store for that we need TopicSubscriptions
                        additionalSubscriptions.add(Builders.topicSubscription().topicFilter(device + "/values").build());
                    }

                    // now we add subscription for the remaining devices
                    Services.extensionExecutorService().submit(() -> {
                        Services.subscriptionStore().addSubscriptions(clientId, additionalSubscriptions);
                    });

                });
            }
        });
    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

    }

    private boolean isUser(final String clientId) {
        return true; //TODO correct check
    }

    private boolean isDevice(final String clientId) {
        return true; //TODO correct check
    }

    private Set<String> fetchDevicesForUser(final String clientId) {
        return Set.of("device1", "device2", "device3"); // TODO correct fetching of devices from a user
    }

}

Here the output from my test:

2020-08-30 20:40:41,469 DEBUG - Client 'iozGlvjrhsN7pwP' SUBSCRIBE's to topic(s) [+/values]
2020-08-30 20:40:41,510 DEBUG - Client 'iozGlvjrhsN7pwP' SUBACK received with reason code(s) [1]
2020-08-30 20:40:41,523 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device1/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,528 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device10/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,530 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device3/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,534 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device10/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,536 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device2/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,536 DEBUG - CountDownLatch will wait max '10' seconds
2020-08-30 20:40:41,601 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device1/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device3/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device2/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - CountDownLatch has the count '0' after waiting '65' milliseconds

What this extension does in essence:

  • convert the +/values SUBSCRIBE of the user to multiple subscriptions of all devices from the user (using SubscribeInboundInterceptor and the SubscriptionStore)
  • the user client will think he is subscribed to +/values but actually he is subscribed to each device individually; as can be seen via the com.hivemq.subscriptions.overall.current metric or Control center client detail:

Beware there are a lot of checks and other configurations missing, so don’t use this in production :wink: :

  • TopicSubscription can have more options (i.e setting qos level; you might want to use the one from the client subscribe, currently it is zero as you can see in the image above)
  • no adding/removing of subscription when user get new/loses a device
  • currently i expect only one subscription for the user (+/values) you might want to have a check here if user can subscribe more subscriptions
  • in the other thread I saw that you store devices for a user in the ConnectionAttributeStore, you can use this in the fetchDevicesForUser method
  • and stuff I surely have forgotten to add

Kind regards,
Michael from the HiveMQ team

1 Like