Skip to content

Commit

Permalink
Merge pull request #47 from ckcr4lyf/tracing/v2
Browse files Browse the repository at this point in the history
Tracing v2
  • Loading branch information
ckcr4lyf authored Jun 12, 2024
2 parents 6fb4e22 + b355fda commit 1bf2c7e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 16 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ $ docker run -e KIRYUU_HOST=http://172.17.0.1:6969 -e REDIS_HOST=redis://172.17.

(Make sure you've kiryuu running locally and redis as well!)

### Dummy cURL

Or you can just send an example cURL

```
curl "localhost:6969/announce?info_hash=AAAAAAAAAAAAAAAAAAAA&port=1337&left=0"
```

## Tracing

To build with tracing, enable the tracing feature:
Expand All @@ -60,5 +68,5 @@ $ RUSTFLAGS="-C target-cpu=native" cargo build --release --features tracing
For local testing, you can run jaeger via:

```sh
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
docker run -d -p127.0.0.1:6831:6831/udp -p127.0.0.1:6832:6832/udp -p127.0.0.1:16686:16686 -p127.0.0.1:14268:14268 jaegertracing/all-in-one:latest
```
26 changes: 15 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
Ok(legit) => legit, // Just set `parsed` , let handler continue
Err(e) => match e {
query::QueryError::ParseFailure => {
trace_log!("failed to parse announce");
return HttpResponse::build(StatusCode::BAD_REQUEST).body("Failed to parse announce\n");
},
query::QueryError::InvalidInfohash => {
trace_log!("invalid infohash");
return HttpResponse::build(StatusCode::BAD_REQUEST).body("Infohash is not 20 bytes\n");
}
}
Expand All @@ -113,7 +115,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
.cmd("ZSCORE").arg(&leechers_key).arg(&parsed.ip_port)
.cmd("GET").arg(&cache_key);

let (is_seeder_v2, is_leecher_v2, cached_reply) : (Exists, Exists, Vec<u8>) = trace_wrap_v2!(pp.query_async(&mut rc).await, "redis").unwrap();
let (is_seeder_v2, is_leecher_v2, cached_reply) : (Exists, Exists, Vec<u8>) = trace_wrap_v2!(pp.query_async(&mut rc).await, "redis", "seeder_leecher_cache").unwrap();

let mut post_announce_pipeline = redis::pipe();
post_announce_pipeline.cmd("ZADD").arg(constants::TORRENTS_KEY).arg(time_now_ms).arg(&parsed.info_hash).ignore(); // To "update" the torrent
Expand Down Expand Up @@ -167,11 +169,12 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
let final_res = match cached_reply.len() {
0 => {
// Cache miss. Lookup from redis
trace_log!("cache miss");
let mut p = redis::pipe();
let pp = p.cmd("ZRANGEBYSCORE").arg(&seeders_key).arg(max_limit).arg(time_now_ms).arg("LIMIT").arg(0).arg(50)
.cmd("ZRANGEBYSCORE").arg(&leechers_key).arg(max_limit).arg(time_now_ms).arg("LIMIT").arg(0).arg(50);

let (seeders, leechers) : (Vec<Vec<u8>>, Vec<Vec<u8>>) = trace_wrap_v2!(pp.query_async(&mut rc).await, "redis").unwrap();
let (seeders, leechers) : (Vec<Vec<u8>>, Vec<Vec<u8>>) = trace_wrap_v2!(pp.query_async(&mut rc).await, "redis", "seeders_leechers").unwrap();

// endex = end index XD. seems in rust cannot select first 50 elements, or limit to less if vector doesnt have 50
// e.g. &seeders[0..50] is panicking when seeders len is < 50. Oh well.
Expand All @@ -181,6 +184,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
query::announce_reply(seeders.len() as i64 + seed_count_mod, leechers.len() as i64 + leech_count_mod, &seeders[0..seeder_endex], &leechers[0..leecher_endex])
},
_ => {
trace_log!("cache hit");
post_announce_pipeline.cmd("INCR").arg(constants::CACHE_HIT_ANNOUNCE_COUNT_KEY).ignore();
cached_reply
}
Expand All @@ -203,6 +207,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {

// TODO: Patch cached reply with the count mods?
// Also invalidate existing cache
trace_log!("need to invalidate cache");
post_announce_pipeline.cmd("DEL").arg(&cache_key).ignore();
} else {
post_announce_pipeline.cmd("INCR").arg(constants::NOCHANGE_ANNOUNCE_COUNT_KEY).ignore();
Expand Down Expand Up @@ -241,12 +246,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
{
get_active_span(|span| {
let infohash = String::from_utf8_lossy(&parsed.info_hash.0).to_string();
let ip = match req.peer_addr() {
Some(val) => val.to_string(),
None => "NO_IP".to_string(),
};
span.set_attribute(Key::new("infohash").string(infohash));
span.set_attribute(Key::new("ip").string(ip));
span.add_event("finished", vec![]);
})
}
Expand All @@ -258,7 +258,7 @@ async fn announce(req: HttpRequest, data: web::Data<AppState>) -> HttpResponse {
async fn healthz(data: web::Data<AppState>) -> HttpResponse {
let mut rc = data.redis_connection.clone();

match trace_wrap_v2!(redis::cmd("PING").query_async::<_, ()>(&mut rc).await, "redis-hc") {
match trace_wrap_v2!(redis::cmd("PING").query_async::<_, ()>(&mut rc).await, "redis", "healthcheck") {
Ok(_) => HttpResponse::build(StatusCode::OK).append_header(header::ContentType::plaintext()).body("OK"),
Err(_) => HttpResponse::build(StatusCode::INTERNAL_SERVER_ERROR).append_header(header::ContentType::plaintext()).body("OOF"),
}
Expand Down Expand Up @@ -298,9 +298,9 @@ fn init_tracer(args: &Args) -> Result<sdktrace::Tracer, TraceError> {
.with_service_name("Kiryuu")
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "my-service"),
opentelemetry::KeyValue::new("service.namespace", "my-namespace"),
opentelemetry::KeyValue::new("exporter", "jaeger"),
// opentelemetry::KeyValue::new("service.name", "my-service"),
// opentelemetry::KeyValue::new("service.namespace", "my-namespace"),
// opentelemetry::KeyValue::new("exporter", "jaeger"),
]),
))
.with_auto_split_batch(true)
Expand Down Expand Up @@ -337,6 +337,10 @@ async fn main() -> std::io::Result<()> {
let tracer = global::tracer("http");
tracer.in_span(req.path().to_string(), move |cx| {
cx.span().set_attribute(Key::new("path").string(req.path().to_string()));
match req.peer_addr() {
Some(val) => cx.span().set_attribute(Key::new("ip").string(val.to_string())),
None => ()
};
cx.span().add_event("starting", vec![]);
let fut = srv.call(req).with_context(cx.clone());

Expand Down
24 changes: 20 additions & 4 deletions src/tracing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#[macro_export]
macro_rules! trace_wrap_v2 {
($expr:expr, $x:literal) => {{
($expr:expr, $loggername:literal, $commandname:literal) => {{
#[cfg(feature = "tracing")]
{
let tracer = global::tracer("IRRELEVANT");
let mut span = tracer.start($x);
span.set_attribute(Key::new("bruv").string("va"));
let tracer = global::tracer($loggername);
let mut span = tracer.start($commandname);
// span.set_attribute(Key::new("bruv").string("va"));
let x = $expr;
span.end();
x
Expand All @@ -16,3 +16,19 @@ macro_rules! trace_wrap_v2 {
}
}};
}

#[macro_export]
macro_rules! trace_log {
($logline:literal) => {{
#[cfg(feature = "tracing")]
{
get_active_span(|span| {
span.add_event($logline, vec![]);
})
}
#[cfg(not(feature = "tracing"))]
{
// noop
}
}};
}

0 comments on commit 1bf2c7e

Please sign in to comment.