azure_iot_operations_services/leased_lock/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Client for Lease Lock operations.

use std::{sync::Arc, time::Duration};

use crate::leased_lock::{
    AcquireAndUpdateKeyOption, Error, ErrorKind, LockObservation, Response, SetCondition,
    SetOptions,
};
use crate::state_store::{self};
use azure_iot_operations_mqtt::interface::ManagedClient;
use azure_iot_operations_protocol::common::hybrid_logical_clock::HybridLogicalClock;

/// Leased Lock client struct.
pub struct Client<C>
where
    C: ManagedClient + Clone + Send + Sync + 'static,
    C::PubReceiver: Send + Sync,
{
    state_store: Arc<state_store::Client<C>>,
    lock_name: Vec<u8>,
    lock_holder_name: Vec<u8>,
}

/// Leased Lock client implementation
///
/// Notes:
/// Do not call any of the methods of this client after the `state_store` parameter is shutdown.
/// Calling any of the methods in this implementation after the `state_store` is shutdown results in undefined behavior.
impl<C> Client<C>
where
    C: ManagedClient + Clone + Send + Sync,
    C::PubReceiver: Send + Sync,
{
    /// Create a new Leased Lock Client.
    ///
    /// Notes:
    /// - `lock_holder_name` is expected to be the client ID used in the underlying MQTT connection settings.
    /// - There must be one instance of `leased_lock::Client` per lock.
    ///
    /// # Errors
    /// [`struct@Error`] of kind [`LockNameLengthZero`](ErrorKind::LockNameLengthZero) if the `lock_name` is empty
    ///
    /// [`struct@Error`] of kind [`LockHolderNameLengthZero`](ErrorKind::LockHolderNameLengthZero) if the `lock_holder_name` is empty
    pub fn new(
        state_store: Arc<state_store::Client<C>>,
        lock_name: Vec<u8>,
        lock_holder_name: Vec<u8>,
    ) -> Result<Self, Error> {
        if lock_name.is_empty() {
            return Err(Error(ErrorKind::LockNameLengthZero));
        }

        if lock_holder_name.is_empty() {
            return Err(Error(ErrorKind::LockHolderNameLengthZero));
        }

        Ok(Self {
            state_store,
            lock_name,
            lock_holder_name,
        })
    }

    /// Attempts to acquire a lock, returning if it cannot be acquired after one attempt.
    ///
    /// `lock_expiration` is how long the lock will remain held in the State Store after acquired, if not released before then.
    /// `request_timeout` is the maximum time the function will wait for receiving a response from the State Store service, it is rounded up to the nearest second.
    ///
    /// Returns Ok with a fencing token (`HybridLogicalClock`) if completed successfully, or `Error(LockAlreadyHeld)` if lock is not acquired.
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
    ///
    /// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `Set` request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
    ///
    /// [`struct@Error`] of kind [`LockAlreadyHeld`](ErrorKind::LockAlreadyHeld) if the `lock` is already in use by another holder
    /// # Panics
    /// Possible panic if, for some error, the fencing token (a.k.a. `version`) of the acquired lock is None.
    pub async fn try_acquire_lock(
        &self,
        lock_expiration: Duration,
        request_timeout: Duration,
    ) -> Result<HybridLogicalClock, Error> {
        let state_store_response = self
            .state_store
            .set(
                self.lock_name.clone(),
                self.lock_holder_name.clone(),
                request_timeout,
                None,
                SetOptions {
                    set_condition: SetCondition::OnlyIfEqualOrDoesNotExist,
                    expires: Some(lock_expiration),
                },
            )
            .await?;

        if state_store_response.response {
            Ok(state_store_response
                .version
                .expect("Got None for fencing token. A lock without a fencing token is of no use."))
        } else {
            Err(Error(ErrorKind::LockAlreadyHeld))
        }
    }

    /// Waits until a lock is available (if not already) and attempts to acquire it.
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns Ok with a fencing token (`HybridLogicalClock`) if completed successfully, or an Error if any failure occurs.
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
    ///
    /// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for the request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
    /// # Panics
    /// Possible panic if, for some error, the fencing token (a.k.a. `version`) of the acquired lock is None.
    pub async fn acquire_lock(
        &self,
        lock_expiration: Duration,
        request_timeout: Duration,
    ) -> Result<HybridLogicalClock, Error> {
        // Logic:
        // a. Start observing lock within this function.
        // b. Try acquiring the lock and return if acquired or got an error other than `LockAlreadyHeld`.
        // c. If got `LockAlreadyHeld`, wait until `Del` notification for the lock. If notification is None, re-observe (start from a. again).
        // d. Loop back starting from b. above.
        // e. Unobserve lock before exiting.

        let mut observe_response = self.observe_lock(request_timeout).await?;
        let mut acquire_result;

        loop {
            acquire_result = self
                .try_acquire_lock(lock_expiration, request_timeout)
                .await;

            match acquire_result {
                Ok(_) => {
                    break; /* Lock acquired */
                }
                Err(ref acquire_error) => match acquire_error.kind() {
                    ErrorKind::LockAlreadyHeld => { /* Must wait for lock to be released. */ }
                    _ => {
                        break;
                    }
                },
            };

            // Lock being held by another client. Wait for delete notification.
            loop {
                let Some((notification, _)) = observe_response.response.recv_notification().await
                else {
                    // If the state_store client gets disconnected (or shutdown), all the observation channels receive a None.
                    // In such case, as per design, we must re-observe the lock.
                    observe_response = self.observe_lock(request_timeout).await?;
                    break;
                };

                if notification.operation == state_store::Operation::Del {
                    break;
                };
            }
        }

        match self.unobserve_lock(request_timeout).await {
            Ok(_) => acquire_result,
            Err(unobserve_error) => Err(unobserve_error),
        }
    }

    /// Waits until a lock is acquired, sets/updates/deletes a key in the State Store (depending on `update_value_function` result) and releases the lock.
    ///
    /// `lock_expiration` should be long enough to last through underlying key operations, otherwise it's possible for updating the value to fail if the lock is no longer held.
    ///
    /// `update_value_function` is a function with signature:
    ///     fn `should_update_key(key_current_value`: `Vec<u8>`) -> `AcquireAndUpdateKeyOption`
    /// Where `key_current_value` is the current value of `key` in the State Store (right after the lock is acquired).
    /// If the return is `AcquireAndUpdateKeyOption::Update(key_new_value)` it must contain the new value of the State Store key.
    ///
    /// The same `request_timeout` is used for all the individual network calls within `acquire_lock_and_update_value`.
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns `true` if the key is successfully set or deleted, or `false` if it is not.
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
    ///
    /// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for the request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
    pub async fn acquire_lock_and_update_value(
        &self,
        lock_expiration: Duration,
        request_timeout: Duration,
        key: Vec<u8>,
        update_value_function: impl Fn(Option<Vec<u8>>) -> AcquireAndUpdateKeyOption,
    ) -> Result<Response<bool>, Error> {
        let fencing_token = self.acquire_lock(lock_expiration, request_timeout).await?;

        /* lock acquired, let's proceed. */
        let get_result = self.state_store.get(key.clone(), request_timeout).await?;

        match update_value_function(get_result.response) {
            AcquireAndUpdateKeyOption::Update(new_value, set_options) => {
                let set_response = self
                    .state_store
                    .set(
                        key,
                        new_value,
                        request_timeout,
                        Some(fencing_token),
                        set_options,
                    )
                    .await;

                let _ = self.release_lock(request_timeout).await;

                Ok(set_response?)
            }
            AcquireAndUpdateKeyOption::DoNotUpdate => {
                let _ = self.release_lock(request_timeout).await;
                Ok(Response {
                    response: true,
                    version: None,
                })
            }
            AcquireAndUpdateKeyOption::Delete => {
                match self
                    .state_store
                    .del(key, Some(fencing_token), request_timeout)
                    .await
                {
                    Ok(delete_response) => {
                        let _ = self.release_lock(request_timeout).await;
                        Ok(Response {
                            response: (delete_response.response > 0),
                            version: delete_response.version,
                        })
                    }
                    Err(delete_error) => {
                        let _ = self.release_lock(request_timeout).await;
                        Err(delete_error.into())
                    }
                }
            }
        }
    }

    /// Releases a lock if and only if requested by the lock holder (same client id).
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns `Ok()` if lock is no longer held by this `lock_holder`, or `Error` otherwise.
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
    ///
    /// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `V Delete` request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
    pub async fn release_lock(&self, request_timeout: Duration) -> Result<(), Error> {
        match self
            .state_store
            .vdel(
                self.lock_name.clone(),
                self.lock_holder_name.clone(),
                None,
                request_timeout,
            )
            .await
        {
            Ok(_) => Ok(()),
            Err(state_store_error) => Err(state_store_error.into()),
        }
    }

    /// Starts observation of any changes on a lock
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns OK([`Response<LockObservation>`]) if the lock is now being observed.
    /// The [`LockObservation`] can be used to receive lock notifications for this lock
    ///
    /// <div class="warning">
    ///
    /// If a client disconnects, `observe_lock` must be called again by the user.
    ///
    /// </div>
    ///
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if
    /// - the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if
    /// - the State Store returns an Error response
    /// - the State Store returns a response that isn't valid for an `Observe` request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if
    /// - there are any underlying errors from the command invoker
    pub async fn observe_lock(
        &self,
        request_timeout: Duration,
    ) -> Result<Response<LockObservation>, Error> {
        Ok(self
            .state_store
            .observe(self.lock_name.clone(), request_timeout)
            .await?)
    }

    /// Stops observation of any changes on a lock.
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns `true` if the lock is no longer being observed or `false` if the lock wasn't being observed
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if
    /// - the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if
    /// - the State Store returns an Error response
    /// - the State Store returns a response that isn't valid for an `Unobserve` request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if
    /// - there are any underlying errors from the command invoker
    pub async fn unobserve_lock(&self, request_timeout: Duration) -> Result<Response<bool>, Error> {
        Ok(self
            .state_store
            .unobserve(self.lock_name.clone(), request_timeout)
            .await?)
    }

    /// Gets the name of the holder of a lock
    ///
    /// Note: `request_timeout` is rounded up to the nearest second.
    ///
    /// Returns `Some(<holder of the lock>)` if the lock is found or `None`
    /// if the lock was not found (i.e., was not acquired by anyone, already released or expired).
    ///
    /// # Errors
    /// [`struct@Error`] of kind [`InvalidArgument`](ErrorKind::InvalidArgument) if the `request_timeout` is zero or > `u32::max`
    ///
    /// [`struct@Error`] of kind [`ServiceError`](ErrorKind::ServiceError) if the State Store returns an Error response
    ///
    /// [`struct@Error`] of kind [`UnexpectedPayload`](ErrorKind::UnexpectedPayload) if the State Store returns a response that isn't valid for a `Get` request
    ///
    /// [`struct@Error`] of kind [`AIOProtocolError`](ErrorKind::AIOProtocolError) if there are any underlying errors from the command invoker
    pub async fn get_lock_holder(
        &self,
        request_timeout: Duration,
    ) -> Result<Response<Option<Vec<u8>>>, Error> {
        Ok(self
            .state_store
            .get(self.lock_name.clone(), request_timeout)
            .await?)
    }
}