Skip to content

Commit

Permalink
remove clocksource dependency (iopsystems#68)
Browse files Browse the repository at this point in the history
Replace clocksource dependency with std::time and chrono
  • Loading branch information
brayniac committed Oct 19, 2023
1 parent cd24bbb commit a26fbb8
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 29 deletions.
63 changes: 62 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ backtrace = "0.3.69"
boring = "2.1.0"
boring-sys = "2.1.0"
bytes = "1.5.0"
chrono = "0.4.31"
clap = "4.4.6"
clocksource = "0.6.0"
foreign-types-shared = "0.3.1"
futures = "0.3.28"
histogram = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions src/clients/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
if session.is_none() {
if session_requests != 0 {
let stop = Instant::now();
let lifecycle_ns = (stop - session_start).as_nanos();
let lifecycle_ns = (stop - session_start).as_nanos() as u64;
let _ = SESSION_LIFECYCLE_REQUESTS.increment(lifecycle_ns);
}
CONNECT.increment();
Expand Down Expand Up @@ -166,7 +166,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

RESPONSE_OK.increment();

let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;

let _ = RESPONSE_LATENCY.increment(latency);

Expand Down
2 changes: 1 addition & 1 deletion src/clients/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ async fn task(

RESPONSE_OK.increment();

let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;

let _ = RESPONSE_LATENCY.increment(latency);

Expand Down
4 changes: 2 additions & 2 deletions src/clients/memcache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
let remaining_time = client_config
.request_timeout()
.as_millis()
.saturating_sub(start.elapsed().as_millis().into());
.saturating_sub(start.elapsed().as_millis());
if remaining_time == 0 {
break Err(ResponseError::Timeout);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

match response {
Ok(response) => {
let latency_ns = stop.duration_since(start).as_nanos();
let latency_ns = stop.duration_since(start).as_nanos() as u64;

// check if the response is valid
if (request.validator)(response).is_err() {
Expand Down
1 change: 1 addition & 0 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::*;
use ::momento::MomentoError;
use async_channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use std::time::Instant;
use tokio::io::*;
use tokio::runtime::Runtime;
use tokio::time::{timeout, Duration};
Expand Down
2 changes: 1 addition & 1 deletion src/clients/momento/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn task(
Ok(_) => {
RESPONSE_OK.increment();

let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;

let _ = RESPONSE_LATENCY.increment(latency);
}
Expand Down
4 changes: 2 additions & 2 deletions src/clients/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
}
Err(e) => match e.kind() {
ErrorKind::WouldBlock => {
let elapsed = start.elapsed().as_nanos();
let elapsed = start.elapsed().as_nanos() as u64;
remaining_time = remaining_time.saturating_sub(elapsed);
if remaining_time == 0 {
break Err(ResponseError::Timeout);
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

RESPONSE_OK.increment();

let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;

let _ = RESPONSE_LATENCY.increment(latency);
}
Expand Down
3 changes: 1 addition & 2 deletions src/clients/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::*;
use crate::net::Connector;
use crate::Instant;
use ::redis::aio::Connection;
use ::redis::{AsyncCommands, RedisConnectionInfo};
use std::borrow::Borrow;
Expand Down Expand Up @@ -160,7 +159,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

let stop = Instant::now();

let latency_ns = stop.duration_since(start).as_nanos();
let latency_ns = stop.duration_since(start).as_nanos() as u64;

match result {
Ok(_) => {
Expand Down
6 changes: 1 addition & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use async_channel::{bounded, Sender};
use backtrace::Backtrace;
use clap::{Arg, Command};
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;
use metriken::{AtomicHistogram, Counter, Gauge};
use once_cell::sync::Lazy;
use ringlog::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::sync::RwLock;
use tokio::time::sleep;
Expand All @@ -27,9 +27,6 @@ mod workload;
use config::*;
use metrics::*;

type Instant = clocksource::Instant<clocksource::Nanoseconds<u64>>;
type UnixInstant = clocksource::UnixInstant<clocksource::Nanoseconds<u64>>;

static RUNNING: AtomicBool = AtomicBool::new(true);

static METRICS_SNAPSHOT: Lazy<Arc<RwLock<MetricsSnapshot>>> =
Expand Down Expand Up @@ -112,7 +109,6 @@ fn main() {
// spawn logging thread
control_runtime.spawn(async move {
while RUNNING.load(Ordering::Relaxed) {
clocksource::refresh_clock();
sleep(Duration::from_millis(1)).await;
let _ = log.flush();
}
Expand Down
8 changes: 4 additions & 4 deletions src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::io::{BufWriter, Write};
#[macro_export]
macro_rules! output {
() => {
let now = clocksource::DateTime::now();
println!("{}", now.to_rfc3339_opts(clocksource::SecondsFormat::Millis, false));
let now = chrono::Utc::now();
println!("{}", now.to_rfc3339_opts(chrono::SecondsFormat::Millis, false));
};
($($arg:tt)*) => {{
let now = clocksource::DateTime::now();
println!("{} {}", now.to_rfc3339_opts(clocksource::SecondsFormat::Millis, false), format_args!($($arg)*));
let now = chrono::Utc::now();
println!("{} {}", now.to_rfc3339_opts(chrono::SecondsFormat::Millis, false), format_args!($($arg)*));
}};
}

Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async fn publisher_task(

match result {
Ok(_) => {
let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;
PUBSUB_PUBLISH_OK.increment();
let _ = PUBSUB_PUBLISH_LATENCY.increment(latency);
}
Expand Down
16 changes: 10 additions & 6 deletions src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::*;
use ahash::RandomState;
use async_channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use tokio::runtime::Runtime;

mod kafka;
Expand Down Expand Up @@ -39,7 +40,10 @@ impl MessageValidator {

/// Sets the checksum and timestamp in the message. Returns the timestamp.
pub fn stamp(&self, message: &mut [u8]) -> u64 {
let timestamp = (UnixInstant::now() - UnixInstant::from_nanos(0)).as_nanos();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let ts = timestamp.to_be_bytes();

// write the current unix time into the message
Expand All @@ -56,7 +60,7 @@ impl MessageValidator {

/// Validate the message checksum and returns a validation result.
pub fn validate(&self, v: &mut Vec<u8>) -> std::result::Result<u64, ValidationError> {
let now_unix = UnixInstant::now();
let now_unix = SystemTime::now();

// check if the magic bytes match
if v[0..8] != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] {
Expand All @@ -77,14 +81,14 @@ impl MessageValidator {
}

// calculate and return the end to end latency
let ts = u64::from_be_bytes([v[16], v[17], v[18], v[19], v[20], v[21], v[22], v[23]]);
let latency = now_unix - UnixInstant::from_nanos(ts);
let ts = u64::from_be_bytes(v[16..24].try_into().unwrap());
let latency = now_unix.duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64 - ts;

let _ = PUBSUB_LATENCY.increment(latency.as_nanos());
let _ = PUBSUB_LATENCY.increment(latency);
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_OK.increment();

Ok(latency.as_nanos())
Ok(latency)
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/pubsub/momento.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ::momento::preview::topics::{SubscriptionItem, TopicClient, ValueKind};
use ::momento::CredentialProviderBuilder;
use futures::stream::StreamExt;
use std::sync::Arc;
use std::time::Instant;
use tokio::time::timeout;

/// Launch tasks with one channel per task as gRPC is mux-enabled.
Expand Down Expand Up @@ -184,6 +185,7 @@ async fn publisher_task(
key: _,
} => {
validator.stamp(&mut message);

PUBSUB_PUBLISH.increment();

match timeout(
Expand All @@ -206,7 +208,7 @@ async fn publisher_task(

match result {
Ok(_) => {
let latency = stop.duration_since(start).as_nanos();
let latency = stop.duration_since(start).as_nanos() as u64;

PUBSUB_PUBLISH_OK.increment();
let _ = PUBSUB_PUBLISH_LATENCY.increment(latency);
Expand Down

0 comments on commit a26fbb8

Please sign in to comment.