azure_iot_operations_services/state_store.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Types for State Store operations.
use core::fmt::Debug;
use azure_iot_operations_protocol::{
common::{aio_protocol_error::AIOProtocolError, hybrid_logical_clock::HybridLogicalClock},
rpc_command,
};
use thiserror::Error;
/// State Store Client implementation
mod client;
/// Serialization and deserialization implementations for resp3 state store payloads
mod resp3;
pub use client::{Client, ClientOptions, ClientOptionsBuilder, KeyObservation};
pub use resp3::{Operation, SetCondition, SetOptions};
/// User Property Key for a [`HybridLogicalClock`] fencing token used to protect the object of the request from conflicting updates.
const FENCING_TOKEN_USER_PROPERTY: &str = "__ft";
/// Represents an error that occurred in the Azure IoT Operations State Store implementation.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(#[from] ErrorKind);
impl Error {
/// Returns the [`ErrorKind`] of the error as a reference.
#[must_use]
pub fn kind(&self) -> &ErrorKind {
&self.0
}
/// Returns the [`ErrorKind`] of the error.
#[must_use]
#[allow(dead_code)]
pub(crate) fn consuming_kind(self) -> ErrorKind {
self.0
}
}
/// Represents the kinds of errors that occur in the Azure IoT Operations State Store implementation.
#[derive(Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ErrorKind {
/// An error occurred in the AIO Protocol. See [`AIOProtocolError`] for more information.
#[error(transparent)]
AIOProtocolError(#[from] AIOProtocolError),
/// An error occurred from the State Store Service. See [`ServiceError`] for more information.
#[error(transparent)]
ServiceError(#[from] ServiceError),
/// The key length must not be zero.
#[error("key length must not be zero")]
KeyLengthZero,
/// An error occurred during serialization of a request.
#[error("{0}")]
SerializationError(String),
/// An argument provided for a request was invalid.
#[error("{0}")]
InvalidArgument(String),
/// The payload of the response does not match the expected type for the request.
#[error("Unexpected response payload for the request type: {0}")]
UnexpectedPayload(String),
/// A key may only have one [`KeyObservation`] at a time.
#[error("key may only be observed once at a time")]
DuplicateObserve,
}
/// Represents the errors that occur in the Azure IoT Operations State Store Service.
#[derive(Error, Debug)]
pub enum ServiceError {
/// the request timestamp is too far in the future; ensure that the client and broker system clocks are synchronized.
#[error(
"the request timestamp is too far in the future; ensure that the client and broker system clocks are synchronized"
)]
TimestampSkew,
/// A fencing token is required for this request. This happens if a key has been marked with a fencing token, but the client doesn't specify it
#[error("a fencing token is required for this request")]
MissingFencingToken,
/// the request fencing token timestamp is too far in the future; ensure that the client and broker system clocks are synchronized.
#[error(
"the request fencing token timestamp is too far in the future; ensure that the client and broker system clocks are synchronized"
)]
FencingTokenSkew,
/// The request fencing token is a lower version than the fencing token protecting the resource.
#[error(
"the request fencing token is a lower version than the fencing token protecting the resource"
)]
FencingTokenLowerVersion,
/// The state store has a quota of how many keys it can store, which is based on the memory profile of the MQ broker that's specified.
#[error("the quota has been exceeded")]
KeyQuotaExceeded,
/// The payload sent does not conform to state store's definition.
#[error("syntax error")]
SyntaxError,
/// The client is not authorized to perform the operation.
#[error("not authorized")]
NotAuthorized,
/// The command sent is not recognized by the state store.
#[error("unknown command")]
UnknownCommand,
/// The number of arguments sent in the command is incorrect.
#[error("wrong number of arguments")]
WrongNumberOfArguments,
/// The timestamp is missing on the request.
#[error("missing timestamp")]
TimestampMissing,
/// The timestamp or fencing token is malformed.
#[error("malformed timestamp")]
TimestampMalformed,
/// The key length is zero.
#[error("the key length is zero")]
KeyLengthZero,
/// An unknown error was received from the State Store Service.
#[error("{0}")]
Unknown(String),
}
impl From<Vec<u8>> for ServiceError {
fn from(s: Vec<u8>) -> Self {
let s_bytes: &[u8] = &s;
match s_bytes {
b"the request timestamp is too far in the future; ensure that the client and broker system clocks are synchronized" => ServiceError::TimestampSkew,
b"a fencing token is required for this request" => ServiceError::MissingFencingToken,
b"the request fencing token timestamp is too far in the future; ensure that the client and broker system clocks are synchronized" => ServiceError::FencingTokenSkew,
b"the request fencing token is a lower version than the fencing token protecting the resource" => ServiceError::FencingTokenLowerVersion,
b"the quota has been exceeded" => ServiceError::KeyQuotaExceeded,
b"syntax error" => ServiceError::SyntaxError,
b"not authorized" => ServiceError::NotAuthorized,
b"unknown command" => ServiceError::UnknownCommand,
b"wrong number of arguments" => ServiceError::WrongNumberOfArguments,
b"missing timestamp" => ServiceError::TimestampMissing,
b"malformed timestamp" => ServiceError::TimestampMalformed,
b"the key length is zero" => ServiceError::KeyLengthZero,
other => ServiceError::Unknown(std::str::from_utf8(other).unwrap_or_default().to_string()),
}
}
}
/// State Store Operation Response struct.
#[derive(Debug)]
pub struct Response<T>
where
T: Debug,
{
/// The version of the key as a [`HybridLogicalClock`].
pub version: Option<HybridLogicalClock>,
/// The response for the request. Will vary per operation.
pub response: T,
}
/// Convenience function to convert a [`rpc_command::invoker::Response`] into a [`Response`]
/// Takes in a closure that converts the payload into the desired type.
fn convert_response<T, F>(
resp: rpc_command::invoker::Response<resp3::Response>,
f: F,
) -> Result<Response<T>, Error>
where
F: FnOnce(resp3::Response) -> Result<T, ()>,
T: Debug,
{
match resp.payload {
resp3::Response::Error(e) => Err(Error(ErrorKind::ServiceError(e.into()))),
payload => match f(payload.clone()) {
Ok(response) => Ok(Response {
response,
version: resp.timestamp,
}),
Err(()) => Err(Error(ErrorKind::UnexpectedPayload(format!("{payload:?}")))),
},
}
}
/// A notification about a state change on a key in the State Store Service
#[derive(Debug, Clone)]
pub struct KeyNotification {
/// The Key that this notification is for
pub key: Vec<u8>,
/// The [`Operation`] that was performed on the key
pub operation: Operation,
/// The version of the key as a [`HybridLogicalClock`].
pub version: HybridLogicalClock,
}