use std::{
fmt::{self, Display},
str::FromStr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use thiserror::Error;
use uuid::Uuid;
pub const DEFAULT_MAX_CLOCK_DRIFT: Duration = Duration::from_secs(60);
#[derive(Clone, Debug, PartialEq)]
pub struct HybridLogicalClock {
pub timestamp: SystemTime,
pub counter: u64,
pub node_id: String,
}
impl Default for HybridLogicalClock {
fn default() -> Self {
Self::new()
}
}
impl HybridLogicalClock {
#[must_use]
pub fn new() -> Self {
Self {
timestamp: now_ms_precision(),
counter: 0,
node_id: Uuid::new_v4().to_string(),
}
}
pub fn update(
&mut self,
other: &HybridLogicalClock,
max_clock_drift: Duration,
) -> Result<(), HLCError> {
let now = now_ms_precision();
if self.node_id == other.node_id {
return Ok(());
}
if now > self.timestamp && now > other.timestamp {
self.timestamp = now;
self.counter = 0;
}
else if other.timestamp == self.timestamp {
if self.counter >= other.counter {
self.validate(now, max_clock_drift)?;
self.counter += 1;
} else {
other.validate(now, max_clock_drift)?;
self.counter = other.counter + 1;
}
}
else if self.timestamp > other.timestamp {
self.validate(now, max_clock_drift)?;
self.counter += 1;
}
else if other.timestamp > self.timestamp {
other.validate(now, max_clock_drift)?;
self.timestamp = other.timestamp;
self.counter = other.counter + 1;
}
Ok(())
}
pub fn update_now(&mut self, max_clock_drift: Duration) -> Result<(), HLCError> {
let now = now_ms_precision();
if now > self.timestamp {
self.timestamp = now;
self.counter = 0;
} else {
self.validate(now, max_clock_drift)?;
self.counter += 1;
}
Ok(())
}
fn validate(&self, now: SystemTime, max_clock_drift: Duration) -> Result<(), HLCError> {
if self.counter == u64::MAX {
return Err(HLCErrorKind::OverflowWarning)?;
}
if let Ok(diff) = self.timestamp.duration_since(now) {
if diff > max_clock_drift {
return Err(HLCErrorKind::ClockDrift)?;
}
} Ok(())
}
}
impl Display for HybridLogicalClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ms_since_epoch = self
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
write!(
f,
"{:0>15}:{:0>5}:{}",
ms_since_epoch, self.counter, self.node_id
)
}
}
impl FromStr for HybridLogicalClock {
type Err = ParseHLCError;
fn from_str(s: &str) -> Result<Self, ParseHLCError> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 3 {
return Err(ParseHLCError {
message: "Incorrect format".to_string(),
input: s.to_string(),
});
}
let ms_since_epoch = match parts[0].parse::<u64>() {
Ok(ms) => ms,
Err(e) => {
return Err(ParseHLCError {
message: format!(
"Malformed HLC. Could not parse first segment as an integer: {e}"
),
input: s.to_string(),
});
}
};
let Some(timestamp) = UNIX_EPOCH.checked_add(Duration::from_millis(ms_since_epoch)) else {
return Err(ParseHLCError {
message: "Malformed HLC. Timestamp is out of range.".to_string(),
input: s.to_string(),
});
};
let counter = match parts[1].parse::<u64>() {
Ok(val) => val,
Err(e) => {
return Err(ParseHLCError {
message: format!(
"Malformed HLC. Could not parse second segment as an integer: {e}"
),
input: s.to_string(),
});
}
};
Ok(Self {
timestamp,
counter,
node_id: parts[2].to_string(),
})
}
}
fn now_ms_precision() -> SystemTime {
#[cfg(not(test))]
let now = SystemTime::now();
#[cfg(test)]
let now = {
let mut offset_now = SystemTime::now();
let offset = TIME_OFFSET.with(std::cell::Cell::get);
let positive = TIME_OFFSET_POS.with(std::cell::Cell::get);
if positive {
offset_now = offset_now.checked_add(offset).unwrap();
} else {
offset_now = offset_now.checked_sub(offset).unwrap();
}
offset_now
};
if let Ok(dur_since_epoch) = now.duration_since(UNIX_EPOCH) {
let sec_since_epoch = dur_since_epoch.as_secs();
let ms_since_epoch = dur_since_epoch.subsec_millis();
if let Some(now) =
UNIX_EPOCH.checked_add(Duration::new(sec_since_epoch, ms_since_epoch * 1_000_000))
{
return now;
}
}
log::warn!(
"Error rounding SystemTime::now() to the nearest millisecond. Returning unrounded time."
);
now
}
#[derive(Debug, Error)]
#[error("{0}")]
pub struct HLCError(#[from] HLCErrorKind);
impl HLCError {
#[must_use]
pub fn kind(&self) -> &HLCErrorKind {
&self.0
}
}
#[derive(Debug, Error)]
pub enum HLCErrorKind {
#[error("counter cannot be incremented")]
OverflowWarning,
#[error("exceeds max clock drift")]
ClockDrift,
}
#[derive(Debug, Error)]
#[error("{message}")]
pub struct ParseHLCError {
message: String,
pub(crate) input: String,
}
#[cfg(test)]
use std::cell::Cell;
#[cfg(test)]
thread_local! {
static TIME_OFFSET: Cell<Duration> = const { Cell::new(Duration::from_secs(0)) };
static TIME_OFFSET_POS: Cell<bool> = const { Cell::new(false) };
}
#[cfg(test)]
fn set_time_offset(offset: Duration, positive: bool) {
TIME_OFFSET.with(|time_offset| time_offset.set(offset));
TIME_OFFSET_POS.with(|time_offset_pos| time_offset_pos.set(positive));
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, UNIX_EPOCH};
use test_case::test_case;
use uuid::Uuid;
#[test]
fn test_validate_default() {
let hlc = HybridLogicalClock::new();
assert!(
hlc.validate(now_ms_precision(), DEFAULT_MAX_CLOCK_DRIFT)
.is_ok()
);
}
#[test_case(120, true, true; "hlc in past more than max drift - success")]
#[test_case(30, true, true; "hlc in past less than max drift - success")]
#[test_case(120, false, false; "hlc in future more than max drift - failure")]
#[test_case(30, false, true; "hlc in future less than max drift - success")]
fn test_validate_drift(offset_sec: u64, positive: bool, should_succeed: bool) {
let hlc = HybridLogicalClock::new();
set_time_offset(Duration::from_secs(offset_sec), positive);
match hlc.validate(now_ms_precision(), DEFAULT_MAX_CLOCK_DRIFT) {
Ok(()) => assert!(should_succeed),
Err(e) => {
assert!(!should_succeed);
matches!(e.kind(), HLCErrorKind::ClockDrift);
}
}
}
#[test_case(Duration::from_secs(31_536_000), Duration::from_secs(5); "large drift 1 year")]
#[test_case(Duration::from_millis(5), Duration::from_millis(2); "tiny drift 5 ms")]
#[test_case(Duration::from_secs(30), Duration::from_secs(5); "normal sized drift less than default 30 sec")]
#[test_case(Duration::from_secs(300), Duration::from_secs(5); "normal sized drift more than default 5 min")]
fn test_max_drift(max_drift: Duration, test_offset: Duration) {
let mut hlc = HybridLogicalClock::new();
assert!(hlc.validate(now_ms_precision(), max_drift).is_ok());
let offset_before = max_drift.checked_sub(test_offset).unwrap();
hlc.timestamp = hlc.timestamp.checked_add(offset_before).unwrap();
assert!(hlc.validate(now_ms_precision(), max_drift).is_ok());
let offset_after = test_offset.checked_add(test_offset).unwrap();
hlc.timestamp = hlc.timestamp.checked_add(offset_after).unwrap();
match hlc.validate(now_ms_precision(), max_drift) {
Ok(()) => panic!("Expected error"),
Err(e) => {
matches!(e.kind(), HLCErrorKind::ClockDrift);
}
}
}
#[test]
fn test_validate_counter() {
let mut hlc = HybridLogicalClock::new();
hlc.counter = u64::MAX;
match hlc.validate(now_ms_precision(), DEFAULT_MAX_CLOCK_DRIFT) {
Ok(()) => panic!("Expected error"),
Err(e) => {
matches!(e.kind(), HLCErrorKind::OverflowWarning);
}
}
hlc.counter = u64::MAX - 1;
assert!(
hlc.validate(now_ms_precision(), DEFAULT_MAX_CLOCK_DRIFT)
.is_ok()
);
}
#[test]
fn test_update_now_default() {
let mut hlc = HybridLogicalClock::new();
assert!(hlc.update_now(DEFAULT_MAX_CLOCK_DRIFT).is_ok());
assert!(hlc.counter == 0 || hlc.counter == 1);
}
#[test_case(120, true, true, 0; "hlc in past more than max drift - success")]
#[test_case(30, true, true, 0; "hlc in past less than max drift - success")]
#[test_case(120, false, false, 1000; "hlc in future more than max drift - failure")] #[test_case(30, false, true, 1; "hlc in future less than max drift - success")]
fn test_update_now_drift(
offset_sec: u64,
positive: bool,
should_succeed: bool,
expected_counter: u64,
) {
let mut hlc = HybridLogicalClock::new();
set_time_offset(Duration::from_secs(offset_sec), positive);
match hlc.update_now(DEFAULT_MAX_CLOCK_DRIFT) {
Ok(()) => {
assert!(should_succeed);
assert_eq!(hlc.counter, expected_counter);
}
Err(e) => {
assert!(!should_succeed);
matches!(e.kind(), HLCErrorKind::ClockDrift);
}
}
}
#[test_case(true; "update against self no time manipulation")]
#[test_case(false; "self in past to verify time isn't updated")]
fn test_update_against_self(offset: bool) {
if offset {
set_time_offset(Duration::from_secs(30), false);
}
let mut self_hlc = HybridLogicalClock::new();
let self_ts_copy = self_hlc.timestamp;
let self_clone = self_hlc.clone();
if offset {
set_time_offset(Duration::from_secs(0), true);
}
assert!(
self_hlc
.update(&self_clone, DEFAULT_MAX_CLOCK_DRIFT)
.is_ok()
);
assert_eq!(self_hlc.timestamp, self_ts_copy);
assert_eq!(self_hlc.counter, 0);
}
#[test_case(-120, -120, true, &[0]; "self_hlc and other_hlc equal and in past more than max drift - success")]
#[test_case(-105, -120, true, &[0]; "self_hlc and other_hlc in past more than max drift, but self_hlc in future of other_hlc - success")]
#[test_case(-120, -105, true, &[0]; "self_hlc and other_hlc in past more than max drift, but self_hlc in past of other_hlc - success")]
#[test_case(-120, -30, true, &[0]; "self_hlc in past more than max drift, other_hlc in past less than max drift - success")]
#[test_case(-120, 0, true, &[0,4]; "self_hlc in past more than max drift, other_hlc now - success")]
#[test_case(-120, 30, true, &[4]; "self_hlc in past more than max drift, other_hlc in future less than max drift - success")]
#[test_case(-120, 120, false, &[]; "self_hlc in past more than max drift, other_hlc in future more than max drift - failure")]
#[test_case(-30, -120, true, &[0]; "self_hlc in past less than max drift, other_hlc in past more than max drift - success")]
#[test_case(-30, -30, true, &[0]; "self_hlc and other_hlc equal and in past less than max drift - success")]
#[test_case(-15, -30, true, &[0]; "self_hlc and other_hlc in past less than max drift, but self_hlc in future of other_hlc - success")]
#[test_case(-30, -15, true, &[0]; "self_hlc and other_hlc in past less than max drift, but self_hlc in past of other_hlc - success")]
#[test_case(-30, 0, true, &[0,4]; "self_hlc in past less than max drift, other_hlc now - success")]
#[test_case(-30, 30, true, &[4]; "self_hlc in past less than max drift, other_hlc in future less than max drift - success")]
#[test_case(-30, 120, false, &[]; "self_hlc in past less than max drift, other_hlc in future more than max drift - failure")]
#[test_case(0, -120, true, &[0,1]; "self_hlc now, other_hlc in past more than max drift - success")]
#[test_case(0, -30, true, &[0,1]; "self_hlc now, other_hlc in past less than max drift - success")]
#[test_case(0, 0, true, &[0,4]; "self_hlc now, other_hlc now - success")]
#[test_case(0, 30, true, &[4]; "self_hlc now, other_hlc in future less than max drift - success")]
#[test_case(0, 120, false, &[]; "self_hlc now, other_hlc in future more than max drift - failure")]
#[test_case(30, -120, true, &[1]; "self_hlc in future less than max drift, other_hlc in past more than max drift - success")]
#[test_case(30, -30, true, &[1]; "self_hlc in future less than max drift, other_hlc in past less than max drift - success")]
#[test_case(30, 0, true, &[1]; "self_hlc in future less than max drift, other_hlc now - success")]
#[test_case(30, 30, true, &[4]; "self_hlc and other_hlc equal and in future less than max drift - success")]
#[test_case(45, 30, true, &[1]; "self_hlc and other_hlc in future less than max drift, but self_hlc in future of other_hlc - success")]
#[test_case(30, 45, true, &[4]; "self_hlc and other_hlc in future less than max drift, but self_hlc in past of other_hlc - success")]
#[test_case(30, 120, false, &[]; "self_hlc in future less than max drift, other_hlc in future more than max drift - failure")]
#[test_case(120, -120, false, &[]; "self_hlc in future more than max drift, other_hlc in past more than max drift - failure")]
#[test_case(120, -30, false, &[]; "self_hlc in future more than max drift, other_hlc in past less than max drift - failure")]
#[test_case(120, 0, false, &[]; "self_hlc in future more than max drift, other_hlc now - failure")]
#[test_case(120, 30, false, &[]; "self_hlc in future more than max drift, other_hlc in future less than max drift - failure")]
#[test_case(120, 120, false, &[]; "self_hlc and other_hlc equal and in future more than max drift - failure")]
#[test_case(135, 120, false, &[]; "self_hlc and other_hlc in future more than max drift, but self_hlc in future of other_hlc - failure")]
#[test_case(120, 135, false, &[]; "self_hlc and other_hlc in future more than max drift, but self_hlc in past of other_hlc - failure")]
fn test_update_other_drift(
self_offset_sec: i64,
other_offset_sec: i64,
should_succeed: bool,
valid_counters: &[u64],
) {
set_time_offset(
Duration::from_secs(self_offset_sec.unsigned_abs()),
self_offset_sec > 0,
);
let mut self_hlc = HybridLogicalClock::new();
let self_ts_copy = self_hlc.timestamp;
let mut other_hlc = if self_offset_sec == other_offset_sec {
let mut other = HybridLogicalClock::new();
other.timestamp = self_hlc.timestamp;
other
} else {
set_time_offset(
Duration::from_secs(other_offset_sec.unsigned_abs()),
other_offset_sec > 0,
);
HybridLogicalClock::new()
};
let other_ts_copy = other_hlc.timestamp;
other_hlc.counter = 3;
set_time_offset(Duration::from_secs(0), true);
match self_hlc.update(&other_hlc, DEFAULT_MAX_CLOCK_DRIFT) {
Ok(()) => {
assert!(should_succeed);
assert!(valid_counters.contains(&self_hlc.counter));
match self_hlc.counter {
0 => {
assert_ne!(self_hlc.timestamp, self_ts_copy);
assert_ne!(self_hlc.timestamp, other_ts_copy);
}
1 => assert_eq!(self_hlc.timestamp, self_ts_copy),
4 => assert_eq!(self_hlc.timestamp, other_ts_copy),
_ => panic!("Unexpected counter value"),
}
}
Err(e) => {
assert!(!should_succeed);
matches!(e.kind(), HLCErrorKind::ClockDrift);
assert_eq!(self_hlc.counter, 0);
assert_eq!(self_hlc.timestamp, self_ts_copy);
}
}
}
#[test_case(30, 15, false, true, true; "other max, but past of self - success")]
#[test_case(15, 30, false, true, false; "other max, future of self - failure")]
#[test_case(15, 30, true, false, true; "self max, but past of other - success")]
#[test_case(30, 15, true, false, false; "self max, future of other - failure")]
#[test_case(-30, -30, true, true, true; "both max, but in past - success")]
#[test_case(30, 30, false, true, false; "other max, same time as self - failure")]
#[test_case(30, 30, true, false, false; "self max, same time as other - failure")]
fn test_update_other_counter(
self_offset_sec: i64,
other_offset_sec: i64,
max_self: bool,
max_other: bool,
should_succeed: bool,
) {
set_time_offset(
Duration::from_secs(self_offset_sec.unsigned_abs()),
self_offset_sec > 0,
);
let mut self_hlc = HybridLogicalClock::new();
if max_self {
self_hlc.counter = u64::MAX;
}
let self_ts_copy = self_hlc.timestamp;
let self_counter_copy = self_hlc.counter;
let mut other_hlc = if self_offset_sec == other_offset_sec {
let mut other = HybridLogicalClock::new();
other.timestamp = self_hlc.timestamp;
other
} else {
set_time_offset(
Duration::from_secs(other_offset_sec.unsigned_abs()),
other_offset_sec > 0,
);
HybridLogicalClock::new()
};
if max_other {
other_hlc.counter = u64::MAX;
}
set_time_offset(Duration::from_secs(0), true);
match self_hlc.update(&other_hlc, DEFAULT_MAX_CLOCK_DRIFT) {
Ok(()) => {
assert!(should_succeed);
}
Err(e) => {
assert!(!should_succeed);
matches!(e.kind(), HLCErrorKind::OverflowWarning);
assert_eq!(self_hlc.counter, self_counter_copy);
assert_eq!(self_hlc.timestamp, self_ts_copy);
}
}
}
#[test]
fn test_new_defaults() {
let hlc = HybridLogicalClock::new();
assert_eq!(hlc.counter, 0);
assert_eq!(
hlc.timestamp.duration_since(UNIX_EPOCH).unwrap().as_nanos() % 1_000_000,
0
);
}
#[test]
fn test_display() {
let hlc = HybridLogicalClock {
timestamp: UNIX_EPOCH,
counter: 0,
node_id: Uuid::nil().to_string(),
};
assert_eq!(
hlc.to_string(),
"000000000000000:00000:00000000-0000-0000-0000-000000000000"
);
}
#[test]
fn test_to_from_str() {
let hlc = HybridLogicalClock::new();
let hlc_str = hlc.to_string();
let parsed_hlc = hlc_str.parse::<HybridLogicalClock>().unwrap();
assert_eq!(parsed_hlc, hlc);
}
}