Hi @Misiu,
this is not possible out of the box, as you would also get messages from d4, d5 and d6.
So correct me if I understand that wrong, but you want that the client only needs to subscribe to +/values
but only gets the messages from devices he owns?
POC I tried out:
public class SubsetOfWildcardSubscriptionExtensionMain implements ExtensionMain {
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
Services.securityRegistry().setAuthenticatorProvider((i) -> (SimpleAuthenticator) (simpleAuthInput, simpleAuthOutput) -> {
final String clientId = simpleAuthInput.getClientInformation().getClientId();
if (isUser(clientId)) {
final TopicPermission userSubscribePermission = Builders.topicPermission()
.topicFilter("+/values")
.activity(TopicPermission.MqttActivity.ALL) // TODO revert back to subscribe; currently ALL so subscriber can simulate device publish
.type(TopicPermission.PermissionType.ALLOW)
.build();
simpleAuthOutput.getDefaultPermissions().add(userSubscribePermission);
simpleAuthOutput.authenticateSuccessfully();
return;
} else if (isDevice(clientId)) {
final TopicPermission devicePublishPermission = Builders.topicPermission()
.topicFilter("+/values")
.activity(TopicPermission.MqttActivity.PUBLISH)
.type(TopicPermission.PermissionType.ALLOW)
.build();
simpleAuthOutput.getDefaultPermissions().add(devicePublishPermission);
simpleAuthOutput.authenticateSuccessfully();
return;
}
simpleAuthOutput.failAuthentication();
});
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
if (isUser(initializerInput.getClientInformation().getClientId())) {
clientContext.addSubscribeInboundInterceptor((subscribeInboundInput, subscribeInboundOutput) -> {
final String clientId = subscribeInboundInput.getClientInformation().getClientId();
final Set<String> devicesForUser = fetchDevicesForUser(clientId);
final Set<TopicSubscription> additionalSubscriptions = new HashSet<>();
boolean replacedSubscription = false;
for (final String device : devicesForUser) {
if (!replacedSubscription) { // we need to change +/values to a device topic of the user
final List<ModifiableSubscription> subscriptions = subscribeInboundOutput.getSubscribePacket().getSubscriptions();
subscriptions.get(0).setTopicFilter(device + "/values");
replacedSubscription = true;
continue;
}
// the remaining devices will be added via Subscription Store for that we need TopicSubscriptions
additionalSubscriptions.add(Builders.topicSubscription().topicFilter(device + "/values").build());
}
// now we add subscription for the remaining devices
Services.extensionExecutorService().submit(() -> {
Services.subscriptionStore().addSubscriptions(clientId, additionalSubscriptions);
});
});
}
});
}
@Override
public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {
}
private boolean isUser(final String clientId) {
return true; //TODO correct check
}
private boolean isDevice(final String clientId) {
return true; //TODO correct check
}
private Set<String> fetchDevicesForUser(final String clientId) {
return Set.of("device1", "device2", "device3"); // TODO correct fetching of devices from a user
}
}
Here the output from my test:
2020-08-30 20:40:41,469 DEBUG - Client 'iozGlvjrhsN7pwP' SUBSCRIBE's to topic(s) [+/values]
2020-08-30 20:40:41,510 DEBUG - Client 'iozGlvjrhsN7pwP' SUBACK received with reason code(s) [1]
2020-08-30 20:40:41,523 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device1/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,528 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device10/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,530 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device3/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,534 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device10/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,536 DEBUG - Client 'iozGlvjrhsN7pwP' added a successful message to sent list. Topic 'device2/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,536 DEBUG - CountDownLatch will wait max '10' seconds
2020-08-30 20:40:41,601 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device1/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device3/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - Client 'iozGlvjrhsN7pwP' added a message to received list. Topic 'device2/values', qos '0', retained 'false', payload ''
2020-08-30 20:40:41,602 DEBUG - CountDownLatch has the count '0' after waiting '65' milliseconds
What this extension does in essence:
- convert the
+/values
SUBSCRIBE of the user to multiple subscriptions of all devices from the user (using SubscribeInboundInterceptor and the SubscriptionStore) - the user client will think he is subscribed to
+/values
but actually he is subscribed to each device individually; as can be seen via thecom.hivemq.subscriptions.overall.current
metric or Control center client detail:
Beware there are a lot of checks and other configurations missing, so don’t use this in production :
- TopicSubscription can have more options (i.e setting qos level; you might want to use the one from the client subscribe, currently it is zero as you can see in the image above)
- no adding/removing of subscription when user get new/loses a device
- currently i expect only one subscription for the user (+/values) you might want to have a check here if user can subscribe more subscriptions
- in the other thread I saw that you store devices for a user in the ConnectionAttributeStore, you can use this in the fetchDevicesForUser method
- and stuff I surely have forgotten to add
Kind regards,
Michael from the HiveMQ team