azure_iot_operations_mqtt/session/wrapper.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use crate::MqttConnectionSettings;
use crate::control_packet::{
Publish, PublishProperties, QoS, SubscribeProperties, UnsubscribeProperties,
};
use crate::error::{PublishError, SubscribeError, UnsubscribeError};
use crate::interface::{AckToken, CompletionToken, ManagedClient, MqttPubSub, PubReceiver};
use crate::rumqttc_adapter as adapter;
use crate::session::managed_client;
use crate::session::reconnect_policy::{ExponentialBackoffWithJitter, ReconnectPolicy};
use crate::session::session;
use crate::session::{SessionConfigError, SessionError, SessionExitError};
use crate::topic::TopicParseError;
/// Client that manages connections over a single MQTT session.
///
/// Use this centrally in an application to control the session and to create
/// instances of [`SessionManagedClient`] and [`SessionExitHandle`].
pub struct Session(session::Session<adapter::ClientAlias, adapter::EventLoopAlias>);
/// Handle used to end an MQTT session.
///
/// PLEASE NOTE WELL
/// This struct's API is designed around negotiating a graceful exit with the MQTT broker.
/// However, this is not actually possible right now due to a bug in underlying MQTT library.
#[derive(Clone)]
pub struct SessionExitHandle(session::SessionExitHandle<adapter::ClientAlias>);
/// Monitor for connection changes in the [`Session`].
///
/// This is largely for informational purposes.
#[derive(Clone)]
pub struct SessionConnectionMonitor(session::SessionConnectionMonitor);
/// An MQTT client that has it's connection state externally managed by a [`Session`].
/// Can be used to send messages and create receivers for incoming messages.
#[derive(Clone)]
pub struct SessionManagedClient(managed_client::SessionManagedClient<adapter::ClientAlias>);
/// Receive and acknowledge incoming MQTT messages.
pub struct SessionPubReceiver(managed_client::SessionPubReceiver);
/// Options for configuring a new [`Session`]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct SessionOptions {
/// MQTT Connection Settings for configuring the [`Session`]
pub connection_settings: MqttConnectionSettings,
/// Reconnect Policy to by used by the `Session`
#[builder(default = "Box::new(ExponentialBackoffWithJitter::default())")]
pub reconnect_policy: Box<dyn ReconnectPolicy>,
/// Maximum number of queued outgoing messages not yet accepted by the MQTT Session
#[builder(default = "100")]
pub outgoing_max: usize,
/// Indicates if the Session should use features specific for use with the AIO MQTT Broker
#[builder(default = "true")]
pub aio_broker_features: bool,
}
impl Session {
/// Create a new [`Session`] with the provided options structure.
///
/// # Errors
/// Returns a [`SessionConfigError`] if there are errors using the session options.
pub fn new(options: SessionOptions) -> Result<Self, SessionConfigError> {
let client_id = options.connection_settings.client_id.clone();
let sat_file = options.connection_settings.sat_file.clone();
// Add AIO metric to user properties when using AIO MQTT broker features
// TODO: consider user properties from being supported on SessionOptions or ConnectionSettings
let user_properties = if options.aio_broker_features {
vec![("metriccategory".into(), "aiosdk-rust".into())]
} else {
vec![]
};
let (client, event_loop) = adapter::client(
options.connection_settings,
options.outgoing_max,
true,
user_properties,
)?;
Ok(Session(session::Session::new_from_injection(
client,
event_loop,
options.reconnect_policy,
client_id,
sat_file,
)))
}
/// Return a new instance of [`SessionExitHandle`] that can be used to end this [`Session`]
pub fn create_exit_handle(&self) -> SessionExitHandle {
SessionExitHandle(self.0.create_exit_handle())
}
/// Return a new instance of [`SessionConnectionMonitor`] that can be used to monitor the connection state
pub fn create_connection_monitor(&self) -> SessionConnectionMonitor {
SessionConnectionMonitor(self.0.create_connection_monitor())
}
/// Return a new instance of [`SessionManagedClient`] that can be used to send and receive messages
pub fn create_managed_client(&self) -> SessionManagedClient {
SessionManagedClient(self.0.create_managed_client())
}
/// Begin running the [`Session`].
///
/// Blocks until either a session exit or a fatal connection error is encountered.
///
/// # Errors
/// Returns a [`SessionError`] if the session encounters a fatal error and ends.
pub async fn run(self) -> Result<(), SessionError> {
self.0.run().await
}
}
impl ManagedClient for SessionManagedClient {
type PubReceiver = SessionPubReceiver;
fn client_id(&self) -> &str {
self.0.client_id()
}
fn create_filtered_pub_receiver(
&self,
topic_filter: &str,
) -> Result<SessionPubReceiver, TopicParseError> {
Ok(SessionPubReceiver(
self.0.create_filtered_pub_receiver(topic_filter)?,
))
}
fn create_unfiltered_pub_receiver(&self) -> SessionPubReceiver {
SessionPubReceiver(self.0.create_unfiltered_pub_receiver())
}
}
#[async_trait]
impl MqttPubSub for SessionManagedClient {
async fn publish(
&self,
topic: impl Into<String> + Send,
qos: QoS,
retain: bool,
payload: impl Into<Bytes> + Send,
) -> Result<CompletionToken, PublishError> {
self.0.publish(topic, qos, retain, payload).await
}
async fn publish_with_properties(
&self,
topic: impl Into<String> + Send,
qos: QoS,
retain: bool,
payload: impl Into<Bytes> + Send,
properties: PublishProperties,
) -> Result<CompletionToken, PublishError> {
self.0
.publish_with_properties(topic, qos, retain, payload, properties)
.await
}
async fn subscribe(
&self,
topic: impl Into<String> + Send,
qos: QoS,
) -> Result<CompletionToken, SubscribeError> {
self.0.subscribe(topic, qos).await
}
async fn subscribe_with_properties(
&self,
topic: impl Into<String> + Send,
qos: QoS,
properties: SubscribeProperties,
) -> Result<CompletionToken, SubscribeError> {
self.0
.subscribe_with_properties(topic, qos, properties)
.await
}
async fn unsubscribe(
&self,
topic: impl Into<String> + Send,
) -> Result<CompletionToken, UnsubscribeError> {
self.0.unsubscribe(topic).await
}
async fn unsubscribe_with_properties(
&self,
topic: impl Into<String> + Send,
properties: UnsubscribeProperties,
) -> Result<CompletionToken, UnsubscribeError> {
self.0.unsubscribe_with_properties(topic, properties).await
}
}
#[async_trait]
impl PubReceiver for SessionPubReceiver {
async fn recv(&mut self) -> Option<Publish> {
self.0.recv().await
}
async fn recv_manual_ack(&mut self) -> Option<(Publish, Option<AckToken>)> {
self.0.recv_manual_ack().await
}
fn close(&mut self) {
self.0.close();
}
}
impl SessionExitHandle {
/// Attempt to gracefully end the MQTT session running in the [`Session`] that created this handle.
/// This will cause the [`Session::run()`] method to return.
///
/// Note that a graceful exit requires the [`Session`] to be connected to the broker.
/// If the [`Session`] is not connected, this method will return an error.
/// If the [`Session`] connection has been recently lost, the [`Session`] may not yet realize this,
/// and it can take until up to the keep-alive interval for the [`Session`] to realize it is disconnected,
/// after which point this method will return an error. Under this circumstance, the attempt was still made,
/// and may eventually succeed even if this method returns the error
///
/// # Errors
/// * [`SessionExitError`] of kind [`SessionExitErrorKind::Detached`](crate::session::SessionExitErrorKind) if the Session no longer exists.
/// * [`SessionExitError`] of kind [`SessionExitErrorKind::BrokerUnavailable`](crate::session::SessionExitErrorKind) if the Session is not connected to the broker.
pub async fn try_exit(&self) -> Result<(), SessionExitError> {
self.0.try_exit().await
}
/// Attempt to gracefully end the MQTT session running in the [`Session`] that created this handle.
/// This will cause the [`Session::run()`] method to return.
///
/// Note that a graceful exit requires the [`Session`] to be connected to the broker.
/// If the [`Session`] is not connected, this method will return an error.
/// If the [`Session`] connection has been recently lost, the [`Session`] may not yet realize this,
/// and it can take until up to the keep-alive interval for the [`Session`] to realize it is disconnected,
/// after which point this method will return an error. Under this circumstance, the attempt was still made,
/// and may eventually succeed even if this method returns the error
/// If the graceful [`Session`] exit attempt does not complete within the specified timeout, this method
/// will return an error.
///
/// # Arguments
/// * `timeout` - The duration to wait for the graceful exit to complete before returning an error.
///
/// # Errors
/// * [`SessionExitError`] of kind [`SessionExitErrorKind::Detached`](crate::session::SessionExitErrorKind) if the Session no longer exists.
/// * [`SessionExitError`] of kind [`SessionExitErrorKind::BrokerUnavailable`](crate::session::SessionExitErrorKind) if the Session is not connected to the broker within the specified timeout interval.
pub async fn try_exit_timeout(&self, timeout: Duration) -> Result<(), SessionExitError> {
self.0.try_exit_timeout(timeout).await
}
/// Forcefully end the MQTT session running in the [`Session`] that created this handle.
/// This will cause the [`Session::run()`] method to return.
///
/// The [`Session`] will be granted a period of 1 second to attempt a graceful exit before
/// forcing the exit. If the exit is forced, the broker will not be aware the MQTT session
/// has ended.
///
/// Returns true if the exit was graceful, and false if the exit was forced.
pub async fn exit_force(&self) -> bool {
self.0.exit_force().await
}
}
impl SessionConnectionMonitor {
/// Returns true if the [`Session`] is currently connected.
/// Note that this may not be accurate if connection has been recently lost.
#[must_use]
pub fn is_connected(&self) -> bool {
self.0.is_connected()
}
/// Wait until the [`Session`] is connected.
/// Returns immediately if already connected.
pub async fn connected(&self) {
self.0.connected().await;
}
/// Wait until the [`Session`] is disconnected.
/// Returns immediately if already disconnected.
pub async fn disconnected(&self) {
self.0.disconnected().await;
}
}