Multithreaded PublishService

Hello all,

I have a question, we encountered an issue in our service, with regard to multithreading and the PublishService. Our code looked like this roughly:

PublishService publishService = Services.publishService();

for (Message message : messages) {
    executorService.submit(message -> publishMessage(publishService, message).join() );
}

It turns out that the code in the PublishService is not thread-safe. This particular problem was fixed by doing

var list = new ArrayList<CompletableFuture<?>>();
for (Message message : messages) {
    list.append(publishMessage(publishService, message));
}

CompletableFuture.allOf(list.toArray()).join();

I added a test for it, and I haven’t seen it going wrong since, it also makes more sense IMO.

However, this got me thinking about mutliple threads calling Services.publishService()#publish and publishing at the same time. Is this something that is doable? I was looking at the code, but it is unclear to me, especially since there is some non-trivial way in which the Services class returns its objects.

How would I have to do this in production, would I need to have a global lock somewhere, so that there is only 1 publish call at the same time, or can I freely create multiple PublishServices and publish at the same time?

Thanks in advance,

Bram

Is there someone that might be able to shed some light on this? I want to be sure that I’m not having some very subtle concurrency issues on my broker.

Hello @basimons ,

Thank you for the outreach!

We would be happy to assist. In coordination with our internal engineering teams, I wanted to reach out here to verify if you would be able to offer a minimal reproducer for this issue for our teams to review directly.

This will allow us to review the issue internally with the reproducer directly, and act accordingly.

Best,
Aaron from the HiveMQ Team

Thanks for your reaction, I do have a reproducible example, but currently it is embedded into our whole server so I cannot share that with you.

I can extract some of it, to make a standalone reproducible example, however I don’t have time for currently, I might be able to squeeze it inbetween some other things, but I’m unsure.

I’ll send you the example asap, but this might also be in a few weeks.

Thanks,

Bram

I got a reproducible example, apologies, it took a long time, as I went on vacation in between and when I came back I had to take care of some other stuff.

I do have a reproducible example. I’d have uploaded the zip file, but that is not allowed here. So here it is in plain text:

package org.example;

import com.hivemq.embedded.EmbeddedExtension;
import com.hivemq.embedded.EmbeddedHiveMQ;
import com.hivemq.embedded.EmbeddedHiveMQBuilder;

public class Main {

    public static void main(String[] args) {

        Extension extension = new Extension();
        final EmbeddedHiveMQBuilder embeddedHiveMQBuilder = EmbeddedHiveMQ.builder()
            .withEmbeddedExtension(EmbeddedExtension.builder()
                .withVersion("0.1")
                .withPriority(1)
                .withId("id")
                .withAuthor("test")
                .withName("Extension reproduce example")
                .withExtensionMain(extension)
                .build());


        try (final EmbeddedHiveMQ hiveMQ = embeddedHiveMQBuilder.build()) {
            hiveMQ.start().join();

            System.out.println("Starting test");
            extension.doTest();
            System.out.println("Finished test!");
        } catch (final Exception ex) {
            ex.printStackTrace();
        }
    }
}

package org.example;

import com.hivemq.extension.sdk.api.ExtensionMain;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStartOutput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopInput;
import com.hivemq.extension.sdk.api.parameter.ExtensionStopOutput;
import com.hivemq.extension.sdk.api.services.Services;
import com.hivemq.extension.sdk.api.services.builder.Builders;
import com.hivemq.extension.sdk.api.services.builder.RetainedPublishBuilder;
import com.hivemq.extension.sdk.api.services.publish.PublishService;
import com.hivemq.extension.sdk.api.services.publish.RetainedMessageStore;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Extension implements ExtensionMain {

    private @NotNull PublishService publishService;
    private @NotNull RetainedPublishBuilder retainedPublishBuilder;
    private @NotNull RetainedMessageStore retainedMessageStore;

    @Override
    public void extensionStart(@NotNull ExtensionStartInput extensionStartInput,
        @NotNull ExtensionStartOutput extensionStartOutput
    ) {

        publishService = Services.publishService();
        retainedPublishBuilder = Builders.retainedPublish();
        retainedMessageStore = Services.retainedMessageStore();
    }

    @Override
    public void extensionStop(@NotNull ExtensionStopInput extensionStopInput,
        @NotNull ExtensionStopOutput extensionStopOutput
    ) {

    }
    
    void doTest() {
        int numberOfPublishes = 10_000;

        ExecutorService ex = Executors.newFixedThreadPool(10);

        ArrayList<CompletableFuture<?>> futures = new ArrayList<>();
        for (int i = 0; i < numberOfPublishes; i++) {

            final int ii = i;
            CompletableFuture<?> future =  CompletableFuture.runAsync(() -> {
                publishService.publish(retainedPublishBuilder
                    .payload(ByteBuffer.wrap(String.valueOf(ii).getBytes(StandardCharsets.UTF_8)))
                    .topic(String.valueOf(ii))
                    .build());
            }, ex);

            futures.add(future);
        }


        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

        HashMap<String, String> retainedMessages = new HashMap<>();
        retainedMessageStore.iterateAllRetainedMessages((c, v) -> {
            retainedMessages.put(v.getTopic(), new String(v.getPayload()
                .map(bb -> {
                    var b = new byte[bb.remaining()];
                    bb.get(b);
                    return b;
                }).orElseThrow(), StandardCharsets.UTF_8));
        }).join();


        retainedMessages.forEach((topic, value) -> {
            if (!topic.equals(value)) {
                System.out.printf("Something went wrong as the topic '%s' contains value '%s'%n", topic, value);
            }
        });
    }
}

With pom file:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-mq-reproduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-community-edition-embedded</artifactId>
            <version>2024.1</version>
        </dependency>
    </dependencies>

</project>

If you’d run run this, you’d see that there are some log/prints showing, which show that some values don’t end up at the right topic, due to a multithreading issue. I hope this clarrifies it enough, if not please let me know.

Or is it just the builder that is not threadsafe, and the new PublishImpl needs to be used?

EDIT: Yes I think that’s it, when I use the new PublishImpl method, it is thread safe, or at least, my test doesn’t fail anymore

Hello @basimons ,

Thank you for the follow-up here!

I will be sure to notify our team to review this reproducer, as well, and will update here with any additional details or questions.

In the meantime, if there is any additional assistance we can provide, please feel free to reach out!

Best,
Aaron from the HiveMQ Team

1 Like