azure_iot_operations_mqtt/session/
reconnect_policy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Reconnect policies for a [`Session`](crate::session::Session).

use std::time::Duration;

use rand::Rng;

use crate::error::ConnectionError;

/// Trait defining interface for reconnect policies.
pub trait ReconnectPolicy {
    /// Get the next reconnect delay.
    /// Returns None if no reconnect should be attempted.
    fn next_reconnect_delay(&self, prev_attempts: u32, error: &ConnectionError)
    -> Option<Duration>;
}

/// A reconnect policy that will exponentially backoff the the delay between reconnect attempts.
///
/// Reconnects will range from 128ms to the specified max wait time, before applying jitter.
//  Jitter can subtract up to 10% of the delay
#[derive(Clone)]
pub struct ExponentialBackoffWithJitter {
    /// The longest possible time to wait between reconnect attempts.
    pub max_wait: Duration,
    /// The max number of reconnect attempts before giving up.
    pub max_reconnect_attempts: Option<u32>,
}

impl ExponentialBackoffWithJitter {
    const MIN_EXPONENT: u32 = 7;
    const BASE_DELAY_MS: u64 = 2;

    /// Determine if a reconnect should be attempted.
    fn should_reconnect(&self, prev_attempts: u32, _error: &ConnectionError) -> bool {
        if let Some(max_attempts) = self.max_reconnect_attempts {
            prev_attempts < max_attempts
        } else {
            true
        }
    }

    /// Calculate the delay for the next reconnect attempt.
    fn calculate_delay(&self, prev_attempts: u32) -> Duration {
        // Exponent cannot be less than 7
        // This is to prevent the delay from being too short.
        let exponent = prev_attempts.saturating_add(Self::MIN_EXPONENT);
        let interval =
            Duration::from_millis(Self::BASE_DELAY_MS.saturating_pow(exponent)).min(self.max_wait);

        // Add jitter to prevent multiple clients from reconnecting at the same time
        // NOTE: This number may biased. If this is an issue, look at different ways to generate jitter.
        let jitter_multiplier = rand::thread_rng().gen_range(0.90..=1.0);
        interval.mul_f64(jitter_multiplier)
    }
}

impl Default for ExponentialBackoffWithJitter {
    /// Indefinite reconnect, with a max wait time of 60 seconds.
    fn default() -> Self {
        Self {
            max_wait: Duration::from_secs(60),
            max_reconnect_attempts: None,
        }
    }
}

impl ReconnectPolicy for ExponentialBackoffWithJitter {
    fn next_reconnect_delay(
        &self,
        attempt_count: u32,
        error: &ConnectionError,
    ) -> Option<Duration> {
        if self.should_reconnect(attempt_count, error) {
            let reconnect_delay = self.calculate_delay(attempt_count);
            Some(reconnect_delay)
        } else {
            None
        }
    }
}