I am using rumqttd as a broker, I keep receiving errors when I try implementing TLS 1.3 over my client code which is
use tokio::{task, time};
use rumqttc::{self, AsyncClient, MqttOptions, QoS};
use std::error::Error;
use std::time::Duration;
#[tokio::main(worker_threads = 1)]
async fn main() → Result<(), Box> {
pretty_env_logger::init();
// color_backtrace::install();
let mut broker = "127.0.0.1";
if env!("HOME") == "/root" {
broker = "mqtt-broker";
}
let mut mqttoptions = MqttOptions::new("test-1", broker, 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 19);
task::spawn(async move {
requests(client).await;
time::sleep(Duration::from_secs(3)).await;
});
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
return Ok(());
}
}
}
}
async fn requests(client: AsyncClient) {
client
.subscribe(“hello/world”, QoS::AtMostOnce)
.await
.unwrap();
for i in 1..=19 {
client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; i])
.await
.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
time::sleep(Duration::from_secs(120)).await;
}
Can someone please help me to modify this code and provide me the cargo toml?
This is where I came so far but I am receiving 10 errors for this version
use std::fs;
use std::io::BufReader;
use std::net::TcpStream;
use std::path::Path;
use futures_util::future::{FutureExt, TryFutureExt};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader};
use tokio::net::TcpStream as AsyncTcpStream;
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::internal::pemfile::{certs, pkcs8_private_keys};
use tokio_rustls::rustls::{ClientConfig, NoClientAuth, ServerCertVerified, TLSError};
use tokio_rustls::TlsConnector;
use mqtt3::RequestError;
use mqtt3::{MqttRead, MqttWrite, QoS};
#[tokio::main]
async fn main() → Result<(), Box> {
// Load the certificates and private key
let certfile = &mut BufReader::new(fs::File::open(“certs/cert.pem”)?);
let keyfile = &mut BufReader::new(fs::File::open(“certs/key.pem”)?);
let cert_chain = certs(certfile).unwrap();
let mut keys = pkcs8_private_keys(keyfile).unwrap();
let key = keys.remove(0);
// Create the TLS configuration
let mut client_config = ClientConfig::new();
client_config
.set_single_client_cert(cert_chain, key)
.map_err(|e| RequestError::SslError(e.to_string()))?;
client_config
.set_protocols(&[b"mqtt".to_vec()]);
client_config
.dangerous()
.set_certificate_verifier(Arc::new(NoClientAuth::new()));
// Create the TLS connector
let tls_connector = TlsConnector::from(Arc::new(client_config));
// Connect to the broker
let mut mqttoptions = mqtt3::MqttOptions::new("test-1", "localhost", 8883);
mqttoptions.set_keep_alive(5);
mqttoptions.set_clean_session(true);
// Create a new AsyncClient with the TLS connector
let stream = AsyncTcpStream::connect("localhost:8883").await?;
let stream = tls_connector.connect("localhost", stream).await?;
let mut client = mqtt3::AsyncClient::new(stream, 10);
let mut eventloop = client.eventloop();
// Connect to the broker
let (ack, _) = eventloop.connect(mqttoptions).await?;
println!("Connected: {:?}", ack);
// Subscribe to a topic
eventloop
.subscribe("test", QoS::AtMostOnce)
.await
.unwrap();
// Publish a message to the topic
eventloop
.publish("test", QoS::AtMostOnce, false, "Hello, world!".to_string())
.await
.unwrap();
// Read messages from the topic
let mut stream = eventloop.stream();
while let Some(msg) = stream.try_next().await? {
println!("Received: {:?}", msg);
}
Ok(())
}