azure_iot_operations_mqtt/interface.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Traits and types for defining sets and subsets of MQTT client functionality.
use async_trait::async_trait;
use bytes::Bytes;
use crate::control_packet::{
AuthProperties, Publish, PublishProperties, QoS, SubscribeProperties, UnsubscribeProperties,
};
use crate::error::{
AckError, CompletionError, ConnectionError, DisconnectError, PublishError, ReauthError,
SubscribeError, UnsubscribeError,
};
pub use crate::session::receiver::AckToken; // TODO: remove this pub re-export after concretized receivers / managed clients
use crate::topic::TopicParseError;
// ---------- Concrete Types ----------
/// Awaitable token indicating completion of MQTT message delivery.
pub struct CompletionToken(
pub Box<dyn std::future::Future<Output = Result<(), CompletionError>> + Send>,
);
impl std::future::Future for CompletionToken {
type Output = Result<(), CompletionError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
// NOTE: Need to use `unsafe` here because we need to poll the inner future, but can't get
// a mutable reference to it, as it's in a box (at least, not without unsafe code).
// It is safe for us to use the `unsafe` code for the following reasons:
//
// 1. This CompletionToken struct is the only reference to the boxed future that it holds
// internally, so mutability among multiple references is not a concern. Nowhere is this
// struct used in a way that the inner future can be accessed from multiple threads,
// and so we are safe from race conditions.
//
// 2. We do not move the memory - the future stays right where it is, all we do is poll it,
// so there is no risk of a null pointer error / segfault.
let inner = unsafe { self.map_unchecked_mut(|s| &mut *s.0) };
inner.poll(cx)
}
}
// Re-export rumqttc types to avoid user code taking the dependency.
// TODO: Re-implement these instead of just aliasing / add to rumqttc adapter
// Only once there are non-rumqttc implementations of these can we allow non-rumqttc compilations
/// Event yielded by the event loop
pub type Event = rumqttc::v5::Event;
/// Incoming data on the event loop
pub type Incoming = rumqttc::v5::Incoming;
/// Outgoing data on the event loop
pub type Outgoing = rumqttc::Outgoing;
// ---------- Lower level MQTT abstractions ----------
/// MQTT publish, subscribe and unsubscribe functionality
#[async_trait]
pub trait MqttPubSub {
/// MQTT Publish
///
/// If connection is unavailable, publish will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn publish(
&self,
topic: impl Into<String> + Send,
qos: QoS,
retain: bool,
payload: impl Into<Bytes> + Send,
) -> Result<CompletionToken, PublishError>;
/// MQTT Publish
///
/// If connection is unavailable, publish will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn publish_with_properties(
&self,
topic: impl Into<String> + Send,
qos: QoS,
retain: bool,
payload: impl Into<Bytes> + Send,
properties: PublishProperties,
) -> Result<CompletionToken, PublishError>;
/// MQTT Subscribe
///
/// If connection is unavailable, subscribe will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn subscribe(
&self,
topic: impl Into<String> + Send,
qos: QoS,
) -> Result<CompletionToken, SubscribeError>;
/// MQTT Subscribe
///
/// If connection is unavailable, subscribe will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn subscribe_with_properties(
&self,
topic: impl Into<String> + Send,
qos: QoS,
properties: SubscribeProperties,
) -> Result<CompletionToken, SubscribeError>;
/// MQTT Unsubscribe
///
/// If connection is unavailable, unsubscribe will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn unsubscribe(
&self,
topic: impl Into<String> + Send,
) -> Result<CompletionToken, UnsubscribeError>;
/// MQTT Unsubscribe
///
/// If connection is unavailable, unsubscribe will be queued and delivered when connection is re-established.
/// Blocks if at capacity for queueing.
async fn unsubscribe_with_properties(
&self,
topic: impl Into<String> + Send,
properties: UnsubscribeProperties,
) -> Result<CompletionToken, UnsubscribeError>;
}
/// Provides functionality for acknowledging a received Publish message (QoS 1)
#[async_trait]
pub trait MqttAck {
/// Acknowledge a received Publish.
async fn ack(&self, publish: &Publish) -> Result<CompletionToken, AckError>;
}
// TODO: consider scoping this to also include a `connect`. Not currently needed, but would be more flexible,
// and make a lot more sense
/// MQTT disconnect functionality
#[async_trait]
pub trait MqttDisconnect {
/// Disconnect from the MQTT broker.
async fn disconnect(&self) -> Result<(), DisconnectError>;
}
/// Internally-facing APIs for the underlying client.
/// Use of this trait is not currently recommended except for mocking.
#[async_trait]
pub trait MqttClient: MqttPubSub + MqttAck + MqttDisconnect {
/// Reauthenticate with the MQTT broker
async fn reauth(&self, auth_props: AuthProperties) -> Result<(), ReauthError>;
}
/// MQTT Event Loop manipulation
#[async_trait]
pub trait MqttEventLoop {
/// Poll the event loop for the next [`Event`]
async fn poll(&mut self) -> Result<Event, ConnectionError>;
/// Modify the clean start flag for subsequent MQTT connection attempts
fn set_clean_start(&mut self, clean_start: bool);
/// Set the authentication method
fn set_authentication_method(&mut self, authentication_method: Option<String>);
/// Set the authentication data
fn set_authentication_data(&mut self, authentication_data: Option<Bytes>);
}
// ---------- Higher level MQTT abstractions ----------
/// An MQTT client that has it's connection state externally managed.
/// Can be used to send messages and create receivers for incoming messages.
pub trait ManagedClient: MqttPubSub {
/// The type of receiver used by this client
type PubReceiver: PubReceiver;
/// Get the client id for the MQTT connection
fn client_id(&self) -> &str;
/// Creates a new [`PubReceiver`] that receives messages on a specific topic
///
/// # Errors
/// Returns a [`TopicParseError`] if the pub receiver cannot be registered.
fn create_filtered_pub_receiver(
&self,
topic_filter: &str,
) -> Result<Self::PubReceiver, TopicParseError>;
/// Creates a new [`PubReceiver`] that receives all messages not sent to other
/// filtered receivers.
fn create_unfiltered_pub_receiver(&self) -> Self::PubReceiver;
}
#[async_trait]
/// Receiver for incoming MQTT messages.
pub trait PubReceiver {
/// Receives the next incoming publish.
///
/// Return None if there will be no more incoming publishes.
async fn recv(&mut self) -> Option<Publish>;
/// Receives the next incoming publish, and a token that can be used to manually acknowledge
/// the publish (Quality of Service 1 or 2), or `None` (Quality of Service 0).
///
/// Return None if there will be no more incoming publishes.
async fn recv_manual_ack(&mut self) -> Option<(Publish, Option<AckToken>)>;
/// Close the receiver, preventing further incoming publishes.
///
/// To guarantee no publish loss, `recv()`/`recv_manual_ack()` must be called until `None` is returned.
fn close(&mut self);
}