Skip to content

Commit

Permalink
Upgrade version of dependencies + fix time breakages
Browse files Browse the repository at this point in the history
To keep things somewhat current, upgrade all dependencies to latest
versions.

The hardest by far is `time`, which has broken its entire API surface
area, _again_. We make some fairly substantial code changes to
accommodate that.

I also changed the store API to use `u64`s instead of `i64`s since
they're working with nanoseconds. I realize in retrospect that this
change might not be needed since Redis works with a signed integer type,
but it improves the interface slightly in that a missing value is
returned as a `None` instead of `-1`, which is more Rust-like.
  • Loading branch information
brandur committed Sep 5, 2024
1 parent 22aed16 commit ff5c856
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 92 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ repository = "https://github.com/brandur/redis-cell"
crate-type = ["dylib"]

[dependencies]
bitflags = "1.0"
libc = "0.2.0"
time = "0.1"
bitflags = "2.6"
libc = "0.2"
time = { version = "0.3", features = ["formatting"] }

[build-dependencies]
cc = "1.0.28"
98 changes: 58 additions & 40 deletions src/cell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Rate {
/// we wanted to have 10 actions every 2 seconds, the period produced would
/// be 200 ms.
pub fn per_period(n: i64, period: time::Duration) -> Rate {
let ns: i64 = period.num_nanoseconds().unwrap();
let ns: i128 = period.whole_nanoseconds();

// Don't rely on floating point math to get here.
if n == 0 || ns == 0 {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<T: store::Store> RateLimiter<T> {
pub fn new(store: T, quota: &RateQuota) -> Self {
RateLimiter {
delay_variation_tolerance: time::Duration::nanoseconds(
quota.max_rate.period.num_nanoseconds().unwrap() * (quota.max_burst + 1),
quota.max_rate.period.whole_nanoseconds() as i64 * (quota.max_burst + 1),
),
emission_interval: quota.max_rate.period,
limit: quota.max_burst + 1,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<T: store::Store> RateLimiter<T> {
}

let increment = time::Duration::nanoseconds(
self.emission_interval.num_nanoseconds().unwrap() * quantity,
self.emission_interval.whole_nanoseconds() as i64 * quantity,
);
self.log_start(key, quantity, increment);

Expand Down Expand Up @@ -139,37 +139,44 @@ impl<T: store::Store> RateLimiter<T> {
let (tat_val, now) = self.store.get_with_time(key)?;

let tat = match tat_val {
-1 => now,
_ => from_nanoseconds(tat_val),
None => now,
Some(v) => from_nanoseconds(v),
};
log_debug!(
self.store,
"tat = {} (from store = {})",
tat.rfc3339(),
tat_val
tat.format(&time::format_description::well_known::Rfc3339)
.unwrap(),
tat_val.unwrap_or(0)
);

let new_tat = if now > tat {
now + increment
} else {
tat + increment
};
log_debug!(self.store, "new_tat = {}", new_tat.rfc3339());
log_debug!(
self.store,
"new_tat = {}",
new_tat
.format(&time::format_description::well_known::Rfc3339)
.unwrap()
);

// Block the request if the next permitted time is in the future.
let allow_at = new_tat - self.delay_variation_tolerance;
let diff = now - allow_at;
log_debug!(
self.store,
"diff = {}ms (now - allow_at)",
diff.num_milliseconds()
diff.whole_milliseconds()
);

if diff < time::Duration::zero() {
if diff < time::Duration::ZERO {
log_debug!(
self.store,
"BLOCKED retry_after = {}ms",
-diff.num_milliseconds()
-diff.whole_milliseconds()
);

if increment <= self.delay_variation_tolerance {
Expand All @@ -190,12 +197,16 @@ impl<T: store::Store> RateLimiter<T> {
//
// Both of these cases are designed to work around the fact that
// another limiter could be running in parallel.
let updated = if tat_val == -1 {
let updated = if tat_val.is_none() {
self.store
.set_if_not_exists_with_ttl(key, new_tat_ns, ttl)?
} else {
self.store
.compare_and_swap_with_ttl(key, tat_val, new_tat_ns, ttl)?
self.store.compare_and_swap_with_ttl(
key,
tat_val.unwrap(),
new_tat_ns,
ttl,
)?
};

if updated {
Expand All @@ -215,8 +226,8 @@ impl<T: store::Store> RateLimiter<T> {

let next = self.delay_variation_tolerance - ttl;
if next > -self.emission_interval {
rlc.remaining = (next.num_microseconds().unwrap() as f64
/ self.emission_interval.num_microseconds().unwrap() as f64)
rlc.remaining = (next.whole_microseconds() as f64
/ self.emission_interval.whole_microseconds() as f64)
as i64;
}
rlc.reset_after = ttl;
Expand All @@ -235,12 +246,12 @@ impl<T: store::Store> RateLimiter<T> {
log_debug!(
self.store,
"retry_after = {}ms",
rlc.retry_after.num_milliseconds()
rlc.retry_after.whole_microseconds()
);
log_debug!(
self.store,
"reset_after = {}ms (ttl)",
rlc.reset_after.num_milliseconds()
rlc.reset_after.whole_microseconds()
);
}

Expand All @@ -252,17 +263,17 @@ impl<T: store::Store> RateLimiter<T> {
log_debug!(
self.store,
"delay_variation_tolerance = {}ms",
self.delay_variation_tolerance.num_milliseconds()
self.delay_variation_tolerance.whole_microseconds()
);
log_debug!(
self.store,
"emission_interval = {}ms",
self.emission_interval.num_milliseconds()
self.emission_interval.whole_microseconds()
);
log_debug!(
self.store,
"tat_increment = {}ms (emission_interval * quantity)",
increment.num_milliseconds()
increment.whole_microseconds()
);
}
}
Expand All @@ -273,17 +284,15 @@ pub struct RateQuota {
pub max_rate: Rate,
}

fn from_nanoseconds(x: i64) -> time::Tm {
let ns = 10_i64.pow(9);
time::at(time::Timespec {
sec: x / ns,
nsec: (x % ns) as i32,
})
fn from_nanoseconds(x: u64) -> time::OffsetDateTime {
time::OffsetDateTime::UNIX_EPOCH
+ time::Duration::new((x / 1_000_000_000) as i64, (x % 1_000_000_000) as i32)
}

fn nanoseconds(x: time::Tm) -> i64 {
let ts = x.to_timespec();
ts.sec * 10_i64.pow(9) + i64::from(ts.nsec)
fn nanoseconds(x: time::OffsetDateTime) -> u64 {
let since_epoch = x - time::OffsetDateTime::UNIX_EPOCH;
since_epoch.whole_seconds() as u64 * 1_000_000_000
+ since_epoch.subsec_nanoseconds() as u64
}

#[cfg(test)]
Expand Down Expand Up @@ -362,6 +371,12 @@ mod tests {
);
}

#[test]
fn it_round_trips_nanoseconds() {
let now = time::OffsetDateTime::now_utc();
assert_eq!(now, from_nanoseconds(nanoseconds(now)))
}

// Skip rustfmt so we don't mangle our big test case array below which is
// already hard enough to read.
#[rustfmt::skip]
Expand All @@ -372,7 +387,7 @@ mod tests {
max_burst: limit - 1,
max_rate: Rate::per_second(1),
};
let start = time::now_utc();
let start = time::OffsetDateTime::now_utc();
let mut memory_store = store::MemoryStore::new_verbose();
let mut test_store = TestStore::new(&mut memory_store);
let mut limiter = RateLimiter::new(&mut test_store, &quota);
Expand All @@ -383,7 +398,7 @@ mod tests {
//

// You can never make a request larger than the maximum.
RateLimitCase::new(0, start, 6, 5, time::Duration::zero(),
RateLimitCase::new(0, start, 6, 5, time::Duration::ZERO,
time::Duration::seconds(-1), true),

// Rate limit normal requests appropriately.
Expand Down Expand Up @@ -484,7 +499,7 @@ mod tests {
#[derive(Debug, Eq, PartialEq)]
struct RateLimitCase {
num: i64,
now: time::Tm,
now: time::OffsetDateTime,
volume: i64,
remaining: i64,
reset_after: time::Duration,
Expand All @@ -495,7 +510,7 @@ mod tests {
impl RateLimitCase {
fn new(
num: i64,
now: time::Tm,
now: time::OffsetDateTime,
volume: i64,
remaining: i64,
reset_after: time::Duration,
Expand All @@ -518,15 +533,15 @@ mod tests {
/// us to tweak certain behavior, like for example setting the effective
/// system clock.
struct TestStore<'a> {
clock: time::Tm,
clock: time::OffsetDateTime,
fail_updates: bool,
store: &'a mut store::MemoryStore,
}

impl<'a> TestStore<'a> {
fn new(store: &'a mut store::MemoryStore) -> TestStore {

Check warning on line 542 in src/cell/mod.rs

View workflow job for this annotation

GitHub Actions / build (nightly)

elided lifetime has a name

Check warning on line 542 in src/cell/mod.rs

View workflow job for this annotation

GitHub Actions / build (nightly)

elided lifetime has a name
TestStore {
clock: time::empty_tm(),
clock: time::OffsetDateTime::now_utc(),
fail_updates: false,
store,
}
Expand All @@ -537,8 +552,8 @@ mod tests {
fn compare_and_swap_with_ttl(
&mut self,
key: &str,
old: i64,
new: i64,
old: u64,
new: u64,
ttl: time::Duration,
) -> Result<bool, CellError> {
if self.fail_updates {
Expand All @@ -548,7 +563,10 @@ mod tests {
}
}

fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> {
fn get_with_time(
&self,
key: &str,
) -> Result<(Option<u64>, time::OffsetDateTime), CellError> {
let tup = self.store.get_with_time(key)?;
Ok((tup.0, self.clock))
}
Expand All @@ -560,7 +578,7 @@ mod tests {
fn set_if_not_exists_with_ttl(
&mut self,
key: &str,
value: i64,
value: u64,
ttl: time::Duration,
) -> Result<bool, CellError> {
if self.fail_updates {
Expand Down
Loading

0 comments on commit ff5c856

Please sign in to comment.