SSL certificate with micropython esp32

Hi, how to connect ESP32 using MICROPYTHON to HiveMQ broker port 8883. i tried many ways but always fails i think due to SSL CERTIFICATION. can someone help me ? thank you so much.

Hello @sam01

Welcome to the HiveMQ Community! Could you please share the ESP32 MicroPython code you used in your tests?

Kind regards,
Diego from HiveMQ Team

hi, thank you for your quick reply , here is my micropython code . i dont know what to put in ssl_params , i tryed many value , i tried to download the isrgrootx1.pem certificate file with : with : ssl_params={“certfile”: “/path/to/ca_cert.pem”}
but with no success. thank you again for your help .

import network
import time
import machine
from umqtt.simple import MQTTClient

Wi-Fi credentials

WIFI_SSID = “Rassi Net3”
WIFI_PASSWORD = “********”

MQTT configuration

MQTT_BROKER = “9************************80.s1.eu.hivemq.cloud"
MQTT_PORT = 8883
MQTT_USER = “sam02”
MQTT_PASSWORD = "

MQTT_TOPIC = “test/topic”

Function to connect to Wi-Fi

def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)

print("Connecting to WiFi...", end="")
while not wlan.isconnected():
    print(".", end="")
    time.sleep(1)
print("\nWiFi connected! IP:", wlan.ifconfig()[0])

Callback function for received messages

def mqtt_message_callback(topic, msg):
print(f"Message received on topic {topic.decode(‘utf-8’)}: {msg.decode(‘utf-8’)}")

Main program

def main():
connect_to_wifi()

print("Connecting to MQTT broker...")
try:
    client = MQTTClient(
        client_id="esp32_client",
        server=MQTT_BROKER,
        port=MQTT_PORT,
        user=MQTT_USER,
        password=MQTT_PASSWORD,
        ssl=True,
       ssl_params={"cert_reqs": 0} # ssl_params={"certfile": "/path/to/ca_cert.pem"}
    )
    client.set_callback(mqtt_message_callback)
    client.connect()
    print("MQTT connected!")
except Exception as e:
    print(f"MQTT connection failed: {type(e).__name__}, {e}")
    return

client.subscribe(MQTT_TOPIC)
print(f"Subscribed to topic: {MQTT_TOPIC}")

try:
    while True:
        client.wait_msg()  # Wait for messages
except KeyboardInterrupt:
    print("Disconnecting...")
    client.disconnect()
    print("Disconnected.")

if name == “main”:
main()

@sam01

I’ve embedded the umqtt.simple2 MQTT client class directly into the code to avoid additional MicroPython package installations, and I’ve made some modifications to get this sample working with HiveMQ Cloud Serverless. Feel free to review the code below and adapt it to suit your needs.

import network
import time
import os
import usocket as socket
import uselect
from utime import ticks_add, ticks_ms, ticks_diff

# ========== Wi-Fi Configuration ==========
WIFI_SSID = "XXXXXXXXXX" # replace with your WiFi SSID
WIFI_PASS = "XXXXXXXXXX" # replace with your WiFi Password

# ========== HiveMQ Cloud Settings ==========
MQTT_BROKER    = "XXXXXXXXXX" # replace with your HiveMQ Cluster URL
MQTT_PORT      = 8883
MQTT_USER      = "XXXXXXXXXX" # replace with your Username
MQTT_PASSWORD  = "XXXXXXXXXX" # replace with your Password
CLIENT_ID      = "esp32-client"
TOPIC   = "test/topic"
MESSAGE = "Hello from ESP32 MicroPython!"

# ========== umqtt.simple2 MQTT client class ==========

class MQTTException(Exception):
    pass

def pid_gen(pid=0):
    while True:
        pid = pid + 1 if pid < 65535 else 1
        yield pid

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
                 ssl=False, ssl_params=None, socket_timeout=5, message_timeout=10):
        """
        Default constructor, initializes MQTTClient object.

        :param client_id:  Unique MQTT ID attached to client.
        :type client_id: str
        :param server: MQTT host address.
        :type server str
        :param port: MQTT Port, typically 1883. If unset, the port number will default to 1883 of 8883 base on ssl.
        :type port: int
        :param user: Username if your server requires it.
        :type user: str
        :param password: Password if your server requires it.
        :type password: str
        :param keepalive: The Keep Alive is a time interval measured in seconds since the last
                          correct control packet was received.
        :type keepalive: int
        :param ssl: Require SSL for the connection.
        :type ssl: bool
        :param ssl_params: Required SSL parameters. Kwargs from function ssl.wrap_socket.
                           See documentation: https://docs.micropython.org/en/latest/library/ssl.html#ssl.ssl.wrap_socket
                           For esp8266, please refer to the capabilities of the axTLS library applied to the micropython port.
                           https://axtls.sourceforge.net
        :type ssl_params: dict
        :param socket_timeout: The time in seconds after which the socket interrupts the connection to the server when
                               no data exchange takes place. None - socket blocking, positive number - seconds to wait.
        :type socket_timeout: int
        :param message_timeout: The time in seconds after which the library recognizes that a message with QoS=1
                                or topic subscription has not been received by the server.
        :type message_timeout: int
        """
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.poller_r = None
        self.poller_w = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params if ssl_params else {}
        self.newpid = pid_gen()
        if not getattr(self, 'cb', None):
            self.cb = None
        if not getattr(self, 'cbstat', None):
            self.cbstat = lambda p, s: None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False
        self.rcv_pids = {}  # PUBACK and SUBACK pids awaiting ACK response

        self.last_ping = ticks_ms()  # Time of the last PING sent
        self.last_cpacket = ticks_ms()  # Time of last Control Packet

        self.socket_timeout = socket_timeout
        self.message_timeout = message_timeout

    def _read(self, n):
        """
        Private class method.

        :param n: Expected length of read bytes
        :type n: int
        :return:

        Notes:
        Current usocket implementation returns None on .read from
        non-blocking socket with no data. However, OSError
        EAGAIN is checked for in case this ever changes.
        """
        if n < 0:
            raise MQTTException(2)
        msg = b''
        while len(msg) < n:
            try:
                rbytes = self.sock.read(n - len(msg))
            except OSError as e:
                if e.args[0] == 11:     # EAGAIN / EWOULDBLOCK
                    rbytes = None
                else:
                    raise
            except AttributeError:
                raise MQTTException(8)
            if rbytes is None:
                self._sock_timeout(self.poller_r, self.socket_timeout)
                continue
            if rbytes == b'':
                raise MQTTException(1) # Connection closed by host (?)
            else:
                msg += rbytes
        return msg

    def _write(self, bytes_wr, length=-1):
        """
        Private class method.

        :param bytes_wr: Bytes sequence for writing
        :type bytes_wr: bytes
        :param length: Expected length of write bytes
        :type length: int
        :return:
        """
        # In non-blocking socket mode, the entire block of data may not be sent.
        try:
            self._sock_timeout(self.poller_w, self.socket_timeout)
            out = self.sock.write(bytes_wr, length)
        except AttributeError:
            raise MQTTException(8)
        if length < 0:
            if out != len(bytes_wr):
                raise MQTTException(3)
        else:
            if out != length:
                raise MQTTException(3)
        return out

    def _send_str(self, s):
        """
        Private class method.
        :param s:
        :type s: byte
        :return: None
        """
        assert len(s) < 65536
        self._write(len(s).to_bytes(2, 'big'))
        self._write(s)

    def _recv_len(self):
        """
        Private class method.
        :return:
        :rtype int
        """
        n = 0
        sh = 0
        while 1:
            b = self._read(1)[0]
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def _varlen_encode(self, value, buf, offset=0):
        assert value < 268435456  # 2**28, i.e. max. four 7-bit bytes
        while value > 0x7f:
            buf[offset] = (value & 0x7f) | 0x80
            value >>= 7
            offset += 1
        buf[offset] = value
        return offset + 1

    def _sock_timeout(self, poller, socket_timeout):
        if self.sock:
            res = poller.poll(-1 if socket_timeout is None else int(socket_timeout * 1000))
            # https://github.com/micropython/micropython/issues/3747#issuecomment-385650294
            # Sockets on esp8266 don't return POLLHUP or POLLERR at all.
            # If a connection is broken then the socket will become readable and a read on it will return b''.
            # POLLIN(value:1) - you have something to read
            # POLLHUP(value:16) - your input is ended
            # POLLIN(1) & POLLHUP(16) = 17 - that meanss that your input is ended and that your have still
            #                                something to read from the buffer
            if res:
                for fd, flag in res:
                    if (not flag & uselect.POLLIN) and (flag & uselect.POLLHUP):
                        raise MQTTException(2 if poller == self.poller_r else 3)
                    if (flag & uselect.POLLERR):
                        raise MQTTException(1)
            else:
                raise MQTTException(30)
        else:
            raise MQTTException(28)

    def set_callback(self, f):
        """
        Set callback for received subscription messages.

        :param f: callable(topic, msg, retained, duplicate)
        """
        self.cb = f

    def set_callback_status(self, f):
        """
        Set the callback for information about whether the sent packet (QoS=1)
        or subscription was received or not by the server.

        :param f: callable(pid, status)

        Where:
            status = 0 - timeout
            status = 1 - successfully delivered
            status = 2 - Unknown PID. It is also possible that the PID is outdated,
                         i.e. it came out of the message timeout.
        """
        self.cbstat = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        """
        Sets the last will and testament of the client. This is used to perform an action by the broker
        in the event that the client "dies".
        Learn more at https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/

        :param topic: Topic of LWT. Takes the from "path/to/topic"
        :type topic: byte
        :param msg: Message to be published to LWT topic.
        :type msg: byte
        :param retain: Have the MQTT broker retain the message.
        :type retain: bool
        :param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
        :type qos: int
        :return: None
        """
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True):
        """
        Establishes connection with the MQTT server.

        :param clean_session: Starts new session on true, resumes past session if false.
        :type clean_session: bool
        :return: Existing persistent session of the client from previous interactions.
        :rtype: bool
        """
        self.disconnect()

        ai = socket.getaddrinfo(self.server, self.port)[0]

        self.sock_raw = socket.socket(ai[0], ai[1], ai[2])
        self.sock_raw.setblocking(False)

        try:
            self.sock_raw.connect(ai[-1])
        except OSError as e:
            import uerrno
            if e.args[0] != uerrno.EINPROGRESS:
                raise

        if self.ssl:
            import ssl
            self.sock_raw.setblocking(True)
            self.sock = ssl.wrap_socket(self.sock_raw, **self.ssl_params)
            self.sock_raw.setblocking(False)
        else:
            self.sock = self.sock_raw

        self.poller_r = uselect.poll()
        self.poller_r.register(self.sock, uselect.POLLERR | uselect.POLLIN | uselect.POLLHUP)
        self.poller_w = uselect.poll()
        self.poller_w.register(self.sock, uselect.POLLOUT)

        # Byte nr - desc
        # 1 - \x10 0001 - Connect Command, 0000 - Reserved
        # 2 - Remaining Length
        # PROTOCOL NAME (3.1.2.1 Protocol Name)
        # 3,4 - protocol name length len('MQTT')
        # 5-8 = 'MQTT'
        # PROTOCOL LEVEL (3.1.2.2 Protocol Level)
        # 9 - mqtt version 0x04
        # CONNECT FLAGS
        # 10 - connection flags
        #  X... .... = User Name Flag
        #  .X.. .... = Password Flag
        #  ..X. .... = Will Retain
        #  ...X X... = QoS Level
        #  .... .X.. = Will Flag
        #  .... ..X. = Clean Session Flag
        #  .... ...0 = (Reserved) It must be 0!
        # KEEP ALIVE
        # 11,12 - keepalive
        # 13,14 - client ID length
        # 15-15+len(client_id) - byte(client_id)
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\0\x04MQTT\x04\0\0\0")

        sz = 10 + 2 + len(self.client_id)

        msg[7] = bool(clean_session) << 1
        # Clean session = True, remove current session
        if bool(clean_session):
            self.rcv_pids.clear()
        if self.user is not None:
            sz += 2 + len(self.user)
            msg[7] |= 1 << 7  # User Name Flag
            if self.pswd is not None:
                sz += 2 + len(self.pswd)
                msg[7] |= 1 << 6  # # Password Flag
        if self.keepalive:
            assert self.keepalive < 65536
            msg[8] |= self.keepalive >> 8
            msg[9] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[7] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[7] |= self.lw_retain << 5

        plen = self._varlen_encode(sz, premsg, 1)
        self._write(premsg, plen)
        self._write(msg)
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user is not None:
            self._send_str(self.user)
            if self.pswd is not None:
                self._send_str(self.pswd)
        resp = self._read(4)
        if not (resp[0] == 0x20 and resp[1] == 0x02):  # control packet type, Remaining Length == 2
            raise MQTTException(29)
        if resp[3] != 0:
            if 1 <= resp[3] <= 5:
                raise MQTTException(20 + resp[3])
            else:
                raise MQTTException(20, resp[3])
        self.last_cpacket = ticks_ms()
        return resp[2] & 1  # Is existing persistent session of the client from previous interactions.

    def disconnect(self):
        """
        Disconnects from the MQTT server.
        :return: None
        """
        if not self.sock:
            return
        try:
            self._write(b"\xe0\0")
        except (OSError, MQTTException):
            pass
        if self.poller_r:
            self.poller_r.unregister(self.sock)
        if self.poller_w:
            self.poller_w.unregister(self.sock)
        try:
            self.sock.close()
        except OSError:
            pass
        self.poller_r = None
        self.poller_w = None
        self.sock = None

    def ping(self):
        """
        Pings the MQTT server.
        :return: None
        """
        self._write(b"\xc0\0")
        self.last_ping = ticks_ms()

    def publish(self, topic, msg, retain=False, qos=0, dup=False):
        """
        Publishes a message to a specified topic.

        :param topic: Topic you wish to publish to. Takes the form "path/to/topic"
        :type topic: byte
        :param msg: Message to publish to topic.
        :type msg: byte
        :param retain: Have the MQTT broker retain the message.
        :type retain: bool
        :param qos: Sets quality of service level. Accepts values 0 to 2. PLEASE NOTE qos=2 is not actually supported.
        :type qos: int
        :param dup: Duplicate delivery of a PUBLISH Control Packet
        :type dup: bool
        :return: None
        """
        assert qos in (0, 1)
        pkt = bytearray(b"\x30\0\0\0\0")
        pkt[0] |= qos << 1 | retain | int(dup) << 3
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        plen = self._varlen_encode(sz, pkt, 1)
        self._write(pkt, plen)
        self._send_str(topic)
        if qos > 0:
            pid = next(self.newpid)
            self._write(pid.to_bytes(2, 'big'))
        self._write(msg)
        if qos > 0:
            self.rcv_pids[pid] = ticks_add(ticks_ms(), self.message_timeout * 1000)
            return pid

    def subscribe(self, topic, qos=0):
        """
        Subscribes to a given topic.

        :param topic: Topic you wish to publish to. Takes the form "path/to/topic"
        :type topic: byte
        :param qos: Sets quality of service level. Accepts values 0 to 1. This gives the maximum QoS level at which
                    the Server can send Application Messages to the Client.
        :type qos: int
        :return: None
        """
        assert qos in (0, 1)
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0\0\0\0")
        pid = next(self.newpid)
        sz = 2 + 2 + len(topic) + 1
        plen = self._varlen_encode(sz, pkt, 1)
        pkt[plen:plen + 2] = pid.to_bytes(2, 'big')
        self._write(pkt, plen + 2)
        self._send_str(topic)
        self._write(qos.to_bytes(1, "little"))  # maximum QOS value that can be given by the server to the client
        self.rcv_pids[pid] = ticks_add(ticks_ms(), self.message_timeout * 1000)
        return pid

    def _message_timeout(self):
        curr_tick = ticks_ms()
        for pid, timeout in self.rcv_pids.items():
            if ticks_diff(timeout, curr_tick) <= 0:
                self.rcv_pids.pop(pid)
                self.cbstat(pid, 0)

    def check_msg(self):
        """
        Checks whether a pending message from server is available.

        If socket_timeout=None, this is the socket lock mode. That is, it waits until the data can be read.

        Otherwise it will return None, after the time set in the socket_timeout.

        It processes such messages:
        - response to PING
        - messages from subscribed topics that are processed by functions set by the set_callback method.
        - reply from the server that he received a QoS=1 message or subscribed to a topic

        :return: None
        """
        if self.sock:
            try:
                res = self.sock.read(1)
                if res is None:
                    # wait forever if no timeout, else wait 1 msec
                    if not self.poller_r.poll(-1 if self.socket_timeout is None else 1):
                        self._message_timeout()
                        return None
                    res = self.sock.read(1)
                    if res is None:
                        self._message_timeout()
                        return None
            except OSError as e:
                # 110: ETIMEDOUT 11: EAGAIN/EWOULDBLOCK
                if e.args[0] == 110 or e.args[0] == 11:
                    self._message_timeout()
                    return None
                else:
                    raise e
        else:
            raise MQTTException(28)

        if res == b'':
            raise MQTTException(1) # Connection closed by host

        if res == b"\xd0":  # PINGRESP
            if self._read(1)[0] != 0:
                MQTTException(-1)
            self.last_cpacket = ticks_ms()
            return

        op = res[0]

        if op == 0x40:  # PUBACK
            sz = self._read(1)
            if sz != b"\x02":
                raise MQTTException(-1)
            rcv_pid = int.from_bytes(self._read(2), 'big')
            if rcv_pid in self.rcv_pids:
                self.last_cpacket = ticks_ms()
                self.rcv_pids.pop(rcv_pid)
                self.cbstat(rcv_pid, 1)
            else:
                self.cbstat(rcv_pid, 2)

        if op == 0x90:  # SUBACK Packet fixed header
            resp = self._read(4)
            # Byte - desc
            # 1 - Remaining Length 2(varible header) + len(payload)=1
            # 2,3 - PID
            # 4 - Payload
            if resp[0] != 0x03:
                raise MQTTException(40, resp)
            if resp[3] == 0x80:
                raise MQTTException(44)
            if resp[3] not in (0, 1, 2):
                raise MQTTException(40, resp)
            pid = resp[2] | (resp[1] << 8)
            if pid in self.rcv_pids:
                self.last_cpacket = ticks_ms()
                self.rcv_pids.pop(pid)
                self.cbstat(pid, 1)
            else:
                raise MQTTException(5)

        self._message_timeout()

        if op & 0xf0 != 0x30:  # 3.3 PUBLISH – Publish message
            return op
        sz = self._recv_len()
        topic_len = int.from_bytes(self._read(2), 'big')
        topic = self._read(topic_len)
        sz -= topic_len + 2
        if op & 6:  # QoS level > 0
            pid = int.from_bytes(self._read(2), 'big')
            sz -= 2
        msg = self._read(sz) if sz else b''
        retained = op & 0x01
        dup = op & 0x08
        self.cb(topic, msg, bool(retained), bool(dup))
        self.last_cpacket = ticks_ms()
        if op & 6 == 2:  # QoS==1
            self._write(b"\x40\x02")  # Send PUBACK
            self._write(pid.to_bytes(2, 'big'))
        elif op & 6 == 4:  # QoS==2
            raise NotImplementedError()
        elif op & 6 == 6:  # 3.3.1.2 QoS - Reserved – must not be used
            raise MQTTException(-1)

    def wait_msg(self):
        """
        This method waits for a message from the server.

        Compatibility with previous versions.

        It is recommended not to use this method. Set socket_time=None instead.
        """
        st_old = self.socket_timeout
        self.socket_timeout = None
        out = self.check_msg()
        self.socket_timeout = st_old
        return out

# ========== 1. Connect to Wi-Fi ==========
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("Connecting to Wi-Fi...")
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            time.sleep(0.5)
    print("Wi-Fi connected:", wlan.ifconfig())

# ========== 2. Connect to HiveMQ Cloud over TLS and Publish ==========
def mqtt_connect_and_publish(broker, port, user, password, client_id, topic, msg):
    """
    Creates an MQTT client, connects to the broker over TLS (port 8883),
    publishes a test message, and then disconnects.
    """
    ssl_params = {
        "server_hostname": broker  # SNI (Server Name Indication) required for HiveMQ Cloud Serverless 
    }
    client = MQTTClient(
        client_id=client_id,
        server=broker,
        port=port,
        user=user,
        password=password,
        ssl=True,
        ssl_params=ssl_params
    )

    print("Connecting to HiveMQ Cloud broker over TLS...")
    client.connect()
    print("Connected!")

    print(f"Publishing to '{topic}': {msg}")
    client.publish(topic.encode("utf-8"), msg.encode("utf-8"))

    # Optional: Disconnect after publishing
    client.disconnect()
    print("Message published, disconnected from HiveMQ Cloud broker.")

# ========== Main Script Logic ==========
def main():
    
    # 1. Connect to Wi-Fi
    connect_wifi(WIFI_SSID, WIFI_PASS)
    
    # 2. Connect to HiveMQ Cloud over TLS and Publish
    mqtt_connect_and_publish(
        MQTT_BROKER,
        MQTT_PORT,
        MQTT_USER,
        MQTT_PASSWORD,
        CLIENT_ID,
        TOPIC,
        MESSAGE
    )

# ========== Entry Point ==========
if __name__ == "__main__":
    main()

Thank you so much for your help