diff --git a/Cargo.toml b/Cargo.toml index 74a437f..ee1070d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,9 @@ repository = "https://github.com/brandur/redis-cell" crate-type = ["dylib"] [dependencies] -bitflags = "1.0" -libc = "0.2.0" -time = "0.1" +bitflags = "2.6" +libc = "0.2" +time = { version = "0.3", features = ["formatting"] } [build-dependencies] cc = "1.0.28" diff --git a/src/cell/mod.rs b/src/cell/mod.rs index 41f84cc..96dc95f 100644 --- a/src/cell/mod.rs +++ b/src/cell/mod.rs @@ -30,7 +30,7 @@ impl Rate { /// we wanted to have 10 actions every 2 seconds, the period produced would /// be 200 ms. pub fn per_period(n: i64, period: time::Duration) -> Rate { - let ns: i64 = period.num_nanoseconds().unwrap(); + let ns: i128 = period.whole_nanoseconds(); // Don't rely on floating point math to get here. if n == 0 || ns == 0 { @@ -76,7 +76,7 @@ impl RateLimiter { pub fn new(store: T, quota: &RateQuota) -> Self { RateLimiter { delay_variation_tolerance: time::Duration::nanoseconds( - quota.max_rate.period.num_nanoseconds().unwrap() * (quota.max_burst + 1), + quota.max_rate.period.whole_nanoseconds() as i64 * (quota.max_burst + 1), ), emission_interval: quota.max_rate.period, limit: quota.max_burst + 1, @@ -111,7 +111,7 @@ impl RateLimiter { } let increment = time::Duration::nanoseconds( - self.emission_interval.num_nanoseconds().unwrap() * quantity, + self.emission_interval.whole_nanoseconds() as i64 * quantity, ); self.log_start(key, quantity, increment); @@ -139,14 +139,15 @@ impl RateLimiter { let (tat_val, now) = self.store.get_with_time(key)?; let tat = match tat_val { - -1 => now, - _ => from_nanoseconds(tat_val), + None => now, + Some(v) => from_nanoseconds(v), }; log_debug!( self.store, "tat = {} (from store = {})", - tat.rfc3339(), - tat_val + tat.format(&time::format_description::well_known::Rfc3339) + .unwrap(), + tat_val.unwrap_or(0) ); let new_tat = if now > tat { @@ -154,7 +155,13 @@ impl RateLimiter { } else { tat + increment }; - log_debug!(self.store, "new_tat = {}", new_tat.rfc3339()); + log_debug!( + self.store, + "new_tat = {}", + new_tat + .format(&time::format_description::well_known::Rfc3339) + .unwrap() + ); // Block the request if the next permitted time is in the future. let allow_at = new_tat - self.delay_variation_tolerance; @@ -162,14 +169,14 @@ impl RateLimiter { log_debug!( self.store, "diff = {}ms (now - allow_at)", - diff.num_milliseconds() + diff.whole_milliseconds() ); - if diff < time::Duration::zero() { + if diff < time::Duration::ZERO { log_debug!( self.store, "BLOCKED retry_after = {}ms", - -diff.num_milliseconds() + -diff.whole_milliseconds() ); if increment <= self.delay_variation_tolerance { @@ -190,12 +197,16 @@ impl RateLimiter { // // Both of these cases are designed to work around the fact that // another limiter could be running in parallel. - let updated = if tat_val == -1 { + let updated = if tat_val.is_none() { self.store .set_if_not_exists_with_ttl(key, new_tat_ns, ttl)? } else { - self.store - .compare_and_swap_with_ttl(key, tat_val, new_tat_ns, ttl)? + self.store.compare_and_swap_with_ttl( + key, + tat_val.unwrap(), + new_tat_ns, + ttl, + )? }; if updated { @@ -215,8 +226,8 @@ impl RateLimiter { let next = self.delay_variation_tolerance - ttl; if next > -self.emission_interval { - rlc.remaining = (next.num_microseconds().unwrap() as f64 - / self.emission_interval.num_microseconds().unwrap() as f64) + rlc.remaining = (next.whole_microseconds() as f64 + / self.emission_interval.whole_microseconds() as f64) as i64; } rlc.reset_after = ttl; @@ -235,12 +246,12 @@ impl RateLimiter { log_debug!( self.store, "retry_after = {}ms", - rlc.retry_after.num_milliseconds() + rlc.retry_after.whole_microseconds() ); log_debug!( self.store, "reset_after = {}ms (ttl)", - rlc.reset_after.num_milliseconds() + rlc.reset_after.whole_microseconds() ); } @@ -252,17 +263,17 @@ impl RateLimiter { log_debug!( self.store, "delay_variation_tolerance = {}ms", - self.delay_variation_tolerance.num_milliseconds() + self.delay_variation_tolerance.whole_microseconds() ); log_debug!( self.store, "emission_interval = {}ms", - self.emission_interval.num_milliseconds() + self.emission_interval.whole_microseconds() ); log_debug!( self.store, "tat_increment = {}ms (emission_interval * quantity)", - increment.num_milliseconds() + increment.whole_microseconds() ); } } @@ -273,17 +284,15 @@ pub struct RateQuota { pub max_rate: Rate, } -fn from_nanoseconds(x: i64) -> time::Tm { - let ns = 10_i64.pow(9); - time::at(time::Timespec { - sec: x / ns, - nsec: (x % ns) as i32, - }) +fn from_nanoseconds(x: u64) -> time::OffsetDateTime { + time::OffsetDateTime::UNIX_EPOCH + + time::Duration::new((x / 1_000_000_000) as i64, (x % 1_000_000_000) as i32) } -fn nanoseconds(x: time::Tm) -> i64 { - let ts = x.to_timespec(); - ts.sec * 10_i64.pow(9) + i64::from(ts.nsec) +fn nanoseconds(x: time::OffsetDateTime) -> u64 { + let since_epoch = x - time::OffsetDateTime::UNIX_EPOCH; + since_epoch.whole_seconds() as u64 * 1_000_000_000 + + since_epoch.subsec_nanoseconds() as u64 } #[cfg(test)] @@ -362,6 +371,12 @@ mod tests { ); } + #[test] + fn it_round_trips_nanoseconds() { + let now = time::OffsetDateTime::now_utc(); + assert_eq!(now, from_nanoseconds(nanoseconds(now))) + } + // Skip rustfmt so we don't mangle our big test case array below which is // already hard enough to read. #[rustfmt::skip] @@ -372,7 +387,7 @@ mod tests { max_burst: limit - 1, max_rate: Rate::per_second(1), }; - let start = time::now_utc(); + let start = time::OffsetDateTime::now_utc(); let mut memory_store = store::MemoryStore::new_verbose(); let mut test_store = TestStore::new(&mut memory_store); let mut limiter = RateLimiter::new(&mut test_store, "a); @@ -383,7 +398,7 @@ mod tests { // // You can never make a request larger than the maximum. - RateLimitCase::new(0, start, 6, 5, time::Duration::zero(), + RateLimitCase::new(0, start, 6, 5, time::Duration::ZERO, time::Duration::seconds(-1), true), // Rate limit normal requests appropriately. @@ -484,7 +499,7 @@ mod tests { #[derive(Debug, Eq, PartialEq)] struct RateLimitCase { num: i64, - now: time::Tm, + now: time::OffsetDateTime, volume: i64, remaining: i64, reset_after: time::Duration, @@ -495,7 +510,7 @@ mod tests { impl RateLimitCase { fn new( num: i64, - now: time::Tm, + now: time::OffsetDateTime, volume: i64, remaining: i64, reset_after: time::Duration, @@ -518,7 +533,7 @@ mod tests { /// us to tweak certain behavior, like for example setting the effective /// system clock. struct TestStore<'a> { - clock: time::Tm, + clock: time::OffsetDateTime, fail_updates: bool, store: &'a mut store::MemoryStore, } @@ -526,7 +541,7 @@ mod tests { impl<'a> TestStore<'a> { fn new(store: &'a mut store::MemoryStore) -> TestStore { TestStore { - clock: time::empty_tm(), + clock: time::OffsetDateTime::now_utc(), fail_updates: false, store, } @@ -537,8 +552,8 @@ mod tests { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: u64, + new: u64, ttl: time::Duration, ) -> Result { if self.fail_updates { @@ -548,7 +563,10 @@ mod tests { } } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(Option, time::OffsetDateTime), CellError> { let tup = self.store.get_with_time(key)?; Ok((tup.0, self.clock)) } @@ -560,7 +578,7 @@ mod tests { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: u64, ttl: time::Duration, ) -> Result { if self.fail_updates { diff --git a/src/cell/store.rs b/src/cell/store.rs index 8d521ea..5f94490 100644 --- a/src/cell/store.rs +++ b/src/cell/store.rs @@ -19,8 +19,8 @@ pub trait Store { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: u64, + new: u64, ttl: time::Duration, ) -> Result; @@ -28,7 +28,10 @@ pub trait Store { /// store (this is done so that rate limiters running on a variety of /// different nodes can operate with a consistent clock instead of using /// their own). If the key was unset, -1 is returned. - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError>; + fn get_with_time( + &self, + key: &str, + ) -> Result<(Option, time::OffsetDateTime), CellError>; /// Logs a debug message to the data store. fn log_debug(&self, message: &str); @@ -38,7 +41,7 @@ pub trait Store { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: u64, ttl: time::Duration, ) -> Result; } @@ -50,14 +53,17 @@ impl<'a, T: Store> Store for &'a mut T { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: u64, + new: u64, ttl: time::Duration, ) -> Result { (**self).compare_and_swap_with_ttl(key, old, new, ttl) } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(Option, time::OffsetDateTime), CellError> { (**self).get_with_time(key) } @@ -68,7 +74,7 @@ impl<'a, T: Store> Store for &'a mut T { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: u64, ttl: time::Duration, ) -> Result { (**self).set_if_not_exists_with_ttl(key, value, ttl) @@ -82,7 +88,7 @@ impl<'a, T: Store> Store for &'a mut T { /// mutex added if it's ever used for anything serious. #[derive(Default)] pub struct MemoryStore { - map: HashMap, + map: HashMap, verbose: bool, } @@ -103,8 +109,8 @@ impl Store for MemoryStore { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: u64, + new: u64, _: time::Duration, ) -> Result { match self.map.get(key) { @@ -116,11 +122,11 @@ impl Store for MemoryStore { Ok(true) } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { - match self.map.get(key) { - Some(n) => Ok((*n, time::now_utc())), - None => Ok((-1, time::now_utc())), - } + fn get_with_time( + &self, + key: &str, + ) -> Result<(Option, time::OffsetDateTime), CellError> { + Ok((self.map.get(key).copied(), time::OffsetDateTime::now_utc())) } fn log_debug(&self, message: &str) { @@ -132,7 +138,7 @@ impl Store for MemoryStore { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: u64, _: time::Duration, ) -> Result { match self.map.get(key) { @@ -145,10 +151,11 @@ impl Store for MemoryStore { } } -/// `InternalRedisStore` is a store implementation that uses Redis module APIs -/// in that it's designed to run from within a Redis runtime. This allows us to -/// cut some corners around atomicity because we can safety assume that all -/// operations will be atomic. +/// `InternalRedisStore` is a store implementation for Redis. +/// +/// It uses Redis' modules APIs in that it's designed to run from within a Redis +/// runtime. This allows us to cut some corners around atomicity because we can +/// safety assume that all operations will be atomic. pub struct InternalRedisStore<'a> { r: &'a redis::Redis, } @@ -163,8 +170,8 @@ impl<'a> Store for InternalRedisStore<'a> { fn compare_and_swap_with_ttl( &mut self, key: &str, - old: i64, - new: i64, + old: u64, + new: u64, ttl: time::Duration, ) -> Result { let key = self.r.open_key_writable(key); @@ -174,7 +181,7 @@ impl<'a> Store for InternalRedisStore<'a> { // in the case of a very fast rate the key's already been // expired even since the beginning of this operation. // Check whether the value is empty to handle that possibility. - if !s.is_empty() && s.parse::()? == old { + if !s.is_empty() && s.parse::()? == old { // Still the old value: perform the swap. key.write(new.to_string().as_str())?; key.set_expire(ttl)?; @@ -191,17 +198,17 @@ impl<'a> Store for InternalRedisStore<'a> { } } - fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> { + fn get_with_time( + &self, + key: &str, + ) -> Result<(Option, time::OffsetDateTime), CellError> { // TODO: currently leveraging that CommandError and CellError are the // same thing, but we should probably reconcile this. let key = self.r.open_key(key); - match key.read()? { - Some(s) => { - let n = s.parse::()?; - Ok((n, time::now_utc())) - } - None => Ok((-1, time::now_utc())), - } + Ok(( + key.read()?.map(|s| s.parse::().unwrap()), + time::OffsetDateTime::now_utc(), + )) } fn log_debug(&self, message: &str) { @@ -211,7 +218,7 @@ impl<'a> Store for InternalRedisStore<'a> { fn set_if_not_exists_with_ttl( &mut self, key: &str, - value: i64, + value: u64, ttl: time::Duration, ) -> Result { let key = self.r.open_key_writable(key); @@ -237,20 +244,17 @@ mod tests { let mut store = MemoryStore::default(); // First attempt obviously works. - let res1 = - store.compare_and_swap_with_ttl("foo", 123, 124, time::Duration::zero()); + let res1 = store.compare_and_swap_with_ttl("foo", 123, 124, time::Duration::ZERO); assert_eq!(true, res1.unwrap()); // Second attempt succeeds: we use the value we just set combined with // a new value. - let res2 = - store.compare_and_swap_with_ttl("foo", 124, 125, time::Duration::zero()); + let res2 = store.compare_and_swap_with_ttl("foo", 124, 125, time::Duration::ZERO); assert_eq!(true, res2.unwrap()); // Third attempt fails: we try to overwrite using a value that is // incorrect. - let res2 = - store.compare_and_swap_with_ttl("foo", 123, 126, time::Duration::zero()); + let res2 = store.compare_and_swap_with_ttl("foo", 123, 126, time::Duration::ZERO); assert_eq!(false, res2.unwrap()); } @@ -259,25 +263,25 @@ mod tests { let mut store = MemoryStore::default(); let res1 = store.get_with_time("foo"); - assert_eq!(-1, res1.unwrap().0); + assert!(res1.unwrap().0.is_none()); // Now try setting a value. let _ = store - .set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()) + .set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO) .unwrap(); let res2 = store.get_with_time("foo"); - assert_eq!(123, res2.unwrap().0); + assert_eq!(Some(123), res2.unwrap().0); } #[test] fn it_performs_set_if_not_exists_with_ttl() { let mut store = MemoryStore::default(); - let res1 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()); + let res1 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO); assert_eq!(true, res1.unwrap()); - let res2 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero()); + let res2 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::ZERO); assert_eq!(false, res2.unwrap()); } } diff --git a/src/lib.rs b/src/lib.rs index fd5f0f1..0ffe070 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,12 +66,12 @@ impl Command for ThrottleCommand { // If either time had a partial component, but it up to the next full // second because otherwise a fast-paced caller could try again too // early. - let mut retry_after = rate_limit_result.retry_after.num_seconds(); - if rate_limit_result.retry_after.num_milliseconds() > 0 { + let mut retry_after = rate_limit_result.retry_after.as_seconds_f64() as i64; + if rate_limit_result.retry_after.subsec_milliseconds() > 0 { retry_after += 1 } - let mut reset_after = rate_limit_result.reset_after.num_seconds(); - if rate_limit_result.reset_after.num_milliseconds() > 0 { + let mut reset_after = rate_limit_result.reset_after.as_seconds_f64() as i64; + if rate_limit_result.reset_after.subsec_milliseconds() > 0 { reset_after += 1 } diff --git a/src/redis/mod.rs b/src/redis/mod.rs index ae04e74..5398a17 100644 --- a/src/redis/mod.rs +++ b/src/redis/mod.rs @@ -344,7 +344,7 @@ impl RedisKeyWritable { } pub fn set_expire(&self, expire: time::Duration) -> Result<(), CellError> { - match raw::set_expire(self.key_inner, expire.num_milliseconds()) { + match raw::set_expire(self.key_inner, expire.whole_milliseconds() as i64) { raw::Status::Ok => Ok(()), // Error may occur if the key wasn't open for writing or is an