How to properly shutdown when asyncClient.connect() fails?

Hi there, I’m looking for help with the async client:

Consider this minimal example (very similar to the AsyncSubscriber example from the official blog):

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;

import static java.nio.charset.StandardCharsets.UTF_8;

public class AsyncSubscriber {

    public static void main(final String [] args) {
        final var asyncClient = Mqtt5Client.builder()
                .identifier("async-subscriber")
                .serverHost("localhost")
                .serverPort(1883)
                .buildAsync();


        asyncClient.publishes(MqttGlobalPublishFilter.ALL, publish -> {
            System.out.println("Received: " + new String(publish.getPayloadAsBytes(), UTF_8));
        });

        asyncClient.connect()
                .thenCompose(connAck -> {
                    System.out.println("Successfully connected!");
                    return asyncClient.disconnect();
                }).thenRun(() -> {
                    System.out.println("Successfully disconnected!");
                }).exceptionally(throwable -> {
                    System.out.println("Something went wrong: " + throwable.getMessage());
                    return null;
                });
    }
}

When I run this while a broker is available under localhost:1883, it prints “Successfully disconnected!\nSuccessfully disconnected!” and then exits.

When I run this while no broker is available under localhost:1883, it prints "“Something went wrong: …” with a ConnectionFailedException and then will not exit.

I noticed that it will exit on failed connection if I do not asyncClient.publishes() so maybe that starts some threads that won’t get shutdown?

Anyway: my main question is: how do I properly shutdown the client and all of its resources when connect() fails?

asyncClient.connect()
                .thenCompose(connAck -> {
                    System.out.println("Successfully connected!");
                    return asyncClient.disconnect();
                }).thenRun(() -> {
                    System.out.println("Successfully disconnected!");
                }).exceptionally(throwable -> {
                    System.out.println("Something went wrong: " + throwable.getMessage());
                    System.exit(1);
                    return null;
                });

Thanks for the reply!

However: I am looking for a way how I can shutdown the client and all of the ressources that it aquired without actually terminating the whole application.

My example code tries to shutdown the whole application, but that was just meant to provide a minimal example of what I tried so far (proving threads started by the asyncClient would be left running in the background).

I found a workaround that is acceptable for me:

If I register my publishes callback via the Rx API and then keep the returned Disposable and call dispose() on it after both a successful disconnect and a failure to connect, then everything appears to be disposed properly:

package org.example;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;

import static java.nio.charset.StandardCharsets.UTF_8;

public class AsyncSubscriber {

    public static void main(final String[] args) {
        final var asyncClient = Mqtt5Client.builder()
                .identifier("async-subscriber")
                .serverHost("localhost")
                .serverPort(1883)
                .buildAsync();


        final var disposable = asyncClient
                .toRx()
                .publishes(MqttGlobalPublishFilter.ALL)
                .subscribe(publish -> {
                    System.out.println("Received: " + new String(publish.getPayloadAsBytes(), UTF_8));
                });

        asyncClient.connect()
                .thenCompose(connAck -> {
                    System.out.println("Successfully connected!");
                    return asyncClient.disconnect();
                }).thenRun(() -> {
                    System.out.println("Successfully disconnected!");
                    disposable.dispose();
                }).exceptionally(throwable -> {
                    System.out.println("Something went wrong: " + throwable.getMessage());
                    disposable.dispose();
                    return null;
                });
    }
}

(For real-world use-cases, one should probably also take care of handling onError of the publishes, but I’m fine with that.)