azure_iot_operations_services/leased_lock/client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Client for Lease Lock operations.
use std::{sync::Arc, time::Duration};
use crate::leased_lock::{
AcquireAndUpdateKeyOption, Error, ErrorKind, LockObservation, Response, SetCondition,
SetOptions,
};
use crate::state_store::{self};
use azure_iot_operations_mqtt::interface::ManagedClient;
use azure_iot_operations_protocol::common::hybrid_logical_clock::HybridLogicalClock;
/// Leased Lock client struct.
pub struct Client<C>
where
C: ManagedClient + Clone + Send + Sync + 'static,
C::PubReceiver: Send + Sync,
{
state_store: Arc<state_store::Client<C>>,
lock_name: Vec<u8>,
lock_holder_name: Vec<u8>,
}
/// Leased Lock client implementation
///
/// Notes:
/// Do not call any of the methods of this client after the `state_store` parameter is shutdown.
/// Calling any of the methods in this implementation after the `state_store` is shutdown results in undefined behavior.
impl<C> Client<C>
where
C: ManagedClient + Clone + Send + Sync,
C::PubReceiver: Send + Sync,
{
/// Create a new Leased Lock Client.
///
/// Notes:
/// - `lock_holder_name` is expected to be the client ID used in the underlying MQTT connection settings.
/// - There must be one instance of `leased_lock::Client` per lock.
///
/// # Errors
/// [`struct@Error`] of kind [`LockNameLengthZero`](ErrorKind::LockNameLengthZero) if the `lock_name` is empty
///
/// [`struct@Error`] of kind [`LockHolderNameLengthZero`](ErrorKind::LockHolderNameLengthZero) if the `lock_holder_name` is empty
pub fn new(
state_store: Arc<state_store::Client<C>>,
lock_name: Vec<u8>,
lock_holder_name: Vec<u8>,
) -> Result<Self, Error> {
if lock_name.is_empty() {
return Err(Error(ErrorKind::LockNameLengthZero));
}
if lock_holder_name.is_empty() {
return Err(Error(ErrorKind::LockHolderNameLengthZero));
}
Ok(Self {
state_store,
lock_name,
lock_holder_name,
})
}
/// Attempts to acquire a lock, returning if it cannot be acquired after one attempt.
///
/// `lock_expiration` is how long the lock will remain held in the State Store after acquired, if not released before then.
/// `request_timeout` is the maximum time the function will wait for receiving a response from the State Store service, it is rounded up to the nearest second.
///
/// Returns Ok with a fencing token (`HybridLogicalClock`) if completed successfully, or `Error(LockAlreadyHeld)` if lock is not acquired.
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
///
/// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `Set` request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
///
/// [`struct@Error`] of kind [`LockAlreadyHeld`](ErrorKind::LockAlreadyHeld) if the `lock` is already in use by another holder
/// # Panics
/// Possible panic if, for some error, the fencing token (a.k.a. `version`) of the acquired lock is None.
pub async fn try_acquire_lock(
&self,
lock_expiration: Duration,
request_timeout: Duration,
) -> Result<HybridLogicalClock, Error> {
let state_store_response = self
.state_store
.set(
self.lock_name.clone(),
self.lock_holder_name.clone(),
request_timeout,
None,
SetOptions {
set_condition: SetCondition::OnlyIfEqualOrDoesNotExist,
expires: Some(lock_expiration),
},
)
.await?;
if state_store_response.response {
Ok(state_store_response
.version
.expect("Got None for fencing token. A lock without a fencing token is of no use."))
} else {
Err(Error(ErrorKind::LockAlreadyHeld))
}
}
/// Waits until a lock is available (if not already) and attempts to acquire it.
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns Ok with a fencing token (`HybridLogicalClock`) if completed successfully, or an Error if any failure occurs.
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
///
/// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for the request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
/// # Panics
/// Possible panic if, for some error, the fencing token (a.k.a. `version`) of the acquired lock is None.
pub async fn acquire_lock(
&self,
lock_expiration: Duration,
request_timeout: Duration,
) -> Result<HybridLogicalClock, Error> {
// Logic:
// a. Start observing lock within this function.
// b. Try acquiring the lock and return if acquired or got an error other than `LockAlreadyHeld`.
// c. If got `LockAlreadyHeld`, wait until `Del` notification for the lock. If notification is None, re-observe (start from a. again).
// d. Loop back starting from b. above.
// e. Unobserve lock before exiting.
let mut observe_response = self.observe_lock(request_timeout).await?;
let mut acquire_result;
loop {
acquire_result = self
.try_acquire_lock(lock_expiration, request_timeout)
.await;
match acquire_result {
Ok(_) => {
break; /* Lock acquired */
}
Err(ref acquire_error) => match acquire_error.kind() {
ErrorKind::LockAlreadyHeld => { /* Must wait for lock to be released. */ }
_ => {
break;
}
},
};
// Lock being held by another client. Wait for delete notification.
loop {
let Some((notification, _)) = observe_response.response.recv_notification().await
else {
// If the state_store client gets disconnected (or shutdown), all the observation channels receive a None.
// In such case, as per design, we must re-observe the lock.
observe_response = self.observe_lock(request_timeout).await?;
break;
};
if notification.operation == state_store::Operation::Del {
break;
};
}
}
match self.unobserve_lock(request_timeout).await {
Ok(_) => acquire_result,
Err(unobserve_error) => Err(unobserve_error),
}
}
/// Waits until a lock is acquired, sets/updates/deletes a key in the State Store (depending on `update_value_function` result) and releases the lock.
///
/// `lock_expiration` should be long enough to last through underlying key operations, otherwise it's possible for updating the value to fail if the lock is no longer held.
///
/// `update_value_function` is a function with signature:
/// fn `should_update_key(key_current_value`: `Vec<u8>`) -> `AcquireAndUpdateKeyOption`
/// Where `key_current_value` is the current value of `key` in the State Store (right after the lock is acquired).
/// If the return is `AcquireAndUpdateKeyOption::Update(key_new_value)` it must contain the new value of the State Store key.
///
/// The same `request_timeout` is used for all the individual network calls within `acquire_lock_and_update_value`.
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns `true` if the key is successfully set or deleted, or `false` if it is not.
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
///
/// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for the request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
pub async fn acquire_lock_and_update_value(
&self,
lock_expiration: Duration,
request_timeout: Duration,
key: Vec<u8>,
update_value_function: impl Fn(Option<Vec<u8>>) -> AcquireAndUpdateKeyOption,
) -> Result<Response<bool>, Error> {
let fencing_token = self.acquire_lock(lock_expiration, request_timeout).await?;
/* lock acquired, let's proceed. */
let get_result = self.state_store.get(key.clone(), request_timeout).await?;
match update_value_function(get_result.response) {
AcquireAndUpdateKeyOption::Update(new_value, set_options) => {
let set_response = self
.state_store
.set(
key,
new_value,
request_timeout,
Some(fencing_token),
set_options,
)
.await;
let _ = self.release_lock(request_timeout).await;
Ok(set_response?)
}
AcquireAndUpdateKeyOption::DoNotUpdate => {
let _ = self.release_lock(request_timeout).await;
Ok(Response {
response: true,
version: None,
})
}
AcquireAndUpdateKeyOption::Delete => {
match self
.state_store
.del(key, Some(fencing_token), request_timeout)
.await
{
Ok(delete_response) => {
let _ = self.release_lock(request_timeout).await;
Ok(Response {
response: (delete_response.response > 0),
version: delete_response.version,
})
}
Err(delete_error) => {
let _ = self.release_lock(request_timeout).await;
Err(delete_error.into())
}
}
}
}
}
/// Releases a lock if and only if requested by the lock holder (same client id).
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns `Ok()` if lock is no longer held by this `lock_holder`, or `Error` otherwise.
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
///
/// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `V Delete` request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
pub async fn release_lock(&self, request_timeout: Duration) -> Result<(), Error> {
match self
.state_store
.vdel(
self.lock_name.clone(),
self.lock_holder_name.clone(),
None,
request_timeout,
)
.await
{
Ok(_) => Ok(()),
Err(state_store_error) => Err(state_store_error.into()),
}
}
/// Starts observation of any changes on a lock
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns OK([`Response<LockObservation>`]) if the lock is now being observed.
/// The [`LockObservation`] can be used to receive lock notifications for this lock
///
/// <div class="warning">
///
/// If a client disconnects, `observe_lock` must be called again by the user.
///
/// </div>
///
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if
/// - the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if
/// - the State Store returns an Error response
/// - the State Store returns a response that isn't valid for an `Observe` request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if
/// - there are any underlying errors from the command invoker
pub async fn observe_lock(
&self,
request_timeout: Duration,
) -> Result<Response<LockObservation>, Error> {
Ok(self
.state_store
.observe(self.lock_name.clone(), request_timeout)
.await?)
}
/// Stops observation of any changes on a lock.
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns `true` if the lock is no longer being observed or `false` if the lock wasn't being observed
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if
/// - the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if
/// - the State Store returns an Error response
/// - the State Store returns a response that isn't valid for an `Unobserve` request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if
/// - there are any underlying errors from the command invoker
pub async fn unobserve_lock(&self, request_timeout: Duration) -> Result<Response<bool>, Error> {
Ok(self
.state_store
.unobserve(self.lock_name.clone(), request_timeout)
.await?)
}
/// Gets the name of the holder of a lock
///
/// Note: `request_timeout` is rounded up to the nearest second.
///
/// Returns `Some(<holder of the lock>)` if the lock is found or `None`
/// if the lock was not found (i.e., was not acquired by anyone, already released or expired).
///
/// # Errors
/// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
///
/// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
///
/// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `Get` request
///
/// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
pub async fn get_lock_holder(
&self,
request_timeout: Duration,
) -> Result<Response<Option<Vec<u8>>>, Error> {
Ok(self
.state_store
.get(self.lock_name.clone(), request_timeout)
.await?)
}
}