Skip to content

Commit

Permalink
metrics & log properties accessible in SQL (#263)
Browse files Browse the repository at this point in the history
* move Property to telemetry crate

* upgrade datafusion

* update dependencies

* make strings in properties ref-counted

* add properties to log views

* make property_get case insensitive

* properties in metrics views
  • Loading branch information
madesroches-ubi authored Nov 28, 2024
1 parent 92d1649 commit b2f4c35
Show file tree
Hide file tree
Showing 27 changed files with 763 additions and 253 deletions.
767 changes: 554 additions & 213 deletions rust/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ciborium = "0.2.2"
clap = { version = "4", features = ["derive"] }
colored = { version = "2" }
ctrlc = "3.2.0"
datafusion = "42.1"
datafusion = "43"
futures = "0.3"
http= "1.1"
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/blocks_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
const VIEW_SET_NAME: &str = "blocks";
const VIEW_INSTANCE_ID: &str = "global";

#[derive(Debug)]
pub struct BlocksViewMaker {}

impl ViewMaker for BlocksViewMaker {
Expand Down
3 changes: 2 additions & 1 deletion rust/analytics/src/lakehouse/log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use uuid::Uuid;

const VIEW_SET_NAME: &str = "log_entries";

#[derive(Debug)]
pub struct LogViewMaker {}

impl ViewMaker for LogViewMaker {
Expand Down Expand Up @@ -97,7 +98,7 @@ impl View for LogView {
}

fn get_file_schema_hash(&self) -> Vec<u8> {
vec![0]
vec![1]
}

fn get_file_schema(&self) -> Arc<Schema> {
Expand Down
3 changes: 2 additions & 1 deletion rust/analytics/src/lakehouse/metrics_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use uuid::Uuid;

const VIEW_SET_NAME: &str = "measures";

#[derive(Debug)]
pub struct MetricsViewMaker {}

impl ViewMaker for MetricsViewMaker {
Expand Down Expand Up @@ -98,7 +99,7 @@ impl View for MetricsView {
}

fn get_file_schema_hash(&self) -> Vec<u8> {
vec![0]
vec![1]
}

fn get_file_schema(&self) -> Arc<Schema> {
Expand Down
12 changes: 5 additions & 7 deletions rust/analytics/src/lakehouse/partition_source_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use datafusion::arrow::array::{
Array, ArrayRef, AsArray, BinaryArray, GenericListArray, Int32Array, Int64Array, RecordBatch,
StringArray, StructArray, TimestampNanosecondArray,
};
use micromegas_ingestion::{
data_lake_connection::DataLakeConnection,
sql_property::{self, Property},
};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_telemetry::property::Property;
use micromegas_telemetry::{stream_info::StreamInfo, types::block::BlockMetadata};
use micromegas_tracing::prelude::*;
use std::sync::Arc;
Expand Down Expand Up @@ -61,7 +59,7 @@ fn read_property_list(value: ArrayRef) -> Result<Vec<Property>> {
for i in 0..properties.len() {
let key = properties.column(key_index).as_string::<i32>().value(i);
let value = properties.column(value_index).as_string::<i32>().value(i);
properties_vec.push(Property::new(key.into(), value.into()));
properties_vec.push(Property::new(Arc::new(key.into()), Arc::new(value.into())));
}
Ok(properties_vec)
}
Expand Down Expand Up @@ -174,7 +172,7 @@ pub async fn fetch_partition_source_data(
objects_metadata: ciborium::from_reader(objects_metadata)
.with_context(|| "decoding objects_metadata")?,
tags: stream_tags,
properties: sql_property::into_hashmap(stream_properties),
properties: micromegas_telemetry::property::into_hashmap(stream_properties),
};
let process_properties = read_property_list(process_properties_column.value(ir))?;
let parent_value = process_parent_column.value(ir);
Expand All @@ -195,7 +193,7 @@ pub async fn fetch_partition_source_data(
start_time: DateTime::from_timestamp_nanos(process_start_time_column.value(ir)),
start_ticks: process_start_ticks_column.value(ir),
parent_process_id,
properties: sql_property::into_hashmap(process_properties),
properties: micromegas_telemetry::property::into_hashmap(process_properties),
};
block_ids_hash += block.nb_objects as i64;
partition_src_blocks.push(Arc::new(PartitionSourceBlock {
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/processes_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
const VIEW_SET_NAME: &str = "processes";
const VIEW_INSTANCE_ID: &str = "global";

#[derive(Debug)]
pub struct ProcessesViewMaker {}

impl ViewMaker for ProcessesViewMaker {
Expand Down
2 changes: 1 addition & 1 deletion rust/analytics/src/lakehouse/property_get_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn find_property_in_list(properties: ArrayRef, name: &str) -> anyhow::Result<Opt
.with_context(|| "getting value field")?;
for i in 0..properties.len() {
let key = properties.column(key_index).as_string::<i32>().value(i);
if key == name {
if key.eq_ignore_ascii_case(name) {
let value = properties.column(value_index).as_string::<i32>().value(i);
return Ok(Some(value.into()));
}
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/streams_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
const VIEW_SET_NAME: &str = "streams";
const VIEW_INSTANCE_ID: &str = "global";

#[derive(Debug)]
pub struct StreamsViewMaker {}

impl ViewMaker for StreamsViewMaker {
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use object_store::ObjectStore;
use std::{any::Any, sync::Arc};

#[derive(Debug)]
pub struct MaterializedView {
lake: Arc<DataLakeConnection>,
object_store: Arc<dyn ObjectStore>,
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/table_scan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use datafusion::{
};
use std::sync::Arc;

#[derive(Debug)]
pub struct TableScanRewrite {
query_range: TimeRange,
}
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/thread_spans_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use uuid::Uuid;

const VIEW_SET_NAME: &str = "thread_spans";

#[derive(Debug)]
pub struct ThreadSpansViewMaker {}

impl ViewMaker for ThreadSpansViewMaker {
Expand Down
4 changes: 3 additions & 1 deletion rust/analytics/src/lakehouse/view_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,14 @@ use super::{
thread_spans_view::ThreadSpansViewMaker, view::View,
};
use anyhow::Result;
use std::fmt::Debug;
use std::{collections::HashMap, sync::Arc};

pub trait ViewMaker: Send + Sync {
pub trait ViewMaker: Send + Sync + Debug {
fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>>;
}

#[derive(Debug)]
pub struct ViewFactory {
view_sets: HashMap<String, Arc<dyn ViewMaker>>,
global_views: Vec<Arc<dyn View>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
/// df_spans = client.query(sql, begin_spans, end_spans)
/// ```
///
#[derive(Debug)]
pub struct ViewInstanceTableFunction {
lake: Arc<DataLakeConnection>,
object_store: Arc<dyn ObjectStore>,
Expand Down
2 changes: 2 additions & 0 deletions rust/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub mod metadata;
pub mod metrics_table;
/// Access to the raw binary telemetry payload
pub mod payload;
/// Reference-counted set of properties in transit format
pub mod property_set;
/// Streams response for long requests
pub mod response_writer;
/// Location in instrumented source code
Expand Down
46 changes: 46 additions & 0 deletions rust/analytics/src/log_entries_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use anyhow::{Context, Result};
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::array::ArrayBuilder;
use datafusion::arrow::array::ListBuilder;
use datafusion::arrow::array::PrimitiveBuilder;
use datafusion::arrow::array::StringBuilder;
use datafusion::arrow::array::StringDictionaryBuilder;
use datafusion::arrow::array::StructBuilder;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::Field;
use datafusion::arrow::datatypes::Fields;
use datafusion::arrow::datatypes::Int16Type;
use datafusion::arrow::datatypes::Int32Type;
use datafusion::arrow::datatypes::Schema;
Expand Down Expand Up @@ -52,6 +55,18 @@ pub fn log_table_schema() -> Schema {
),
Field::new("level", DataType::Int32, false),
Field::new("msg", DataType::Utf8, false),
Field::new(
"properties",
DataType::List(Arc::new(Field::new(
"Property",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
))),
false,
),
])
}

Expand All @@ -64,10 +79,24 @@ pub struct LogEntriesRecordBuilder {
pub targets: StringDictionaryBuilder<Int16Type>,
pub levels: PrimitiveBuilder<Int32Type>,
pub msgs: StringBuilder,
pub properties: ListBuilder<StructBuilder>,
}

impl LogEntriesRecordBuilder {
pub fn with_capacity(capacity: usize) -> Self {
let prop_struct_fields = vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
];
let prop_field = Arc::new(Field::new(
"Property",
DataType::Struct(Fields::from(prop_struct_fields.clone())),
false,
));
let props_builder =
ListBuilder::new(StructBuilder::from_fields(prop_struct_fields, capacity))
.with_field(prop_field);

Self {
process_ids: StringDictionaryBuilder::new(),
exes: StringDictionaryBuilder::new(),
Expand All @@ -77,6 +106,7 @@ impl LogEntriesRecordBuilder {
targets: StringDictionaryBuilder::new(),
levels: PrimitiveBuilder::with_capacity(capacity),
msgs: StringBuilder::new(),
properties: props_builder,
}
}

Expand Down Expand Up @@ -110,6 +140,21 @@ impl LogEntriesRecordBuilder {
self.targets.append_value(&*row.target);
self.levels.append_value(row.level);
self.msgs.append_value(&*row.msg);

let property_builder = self.properties.values();
row.properties.for_each_property(|prop| {
let key_builder = property_builder
.field_builder::<StringBuilder>(0)
.with_context(|| "getting key field builder")?;
key_builder.append_value(prop.key_str());
let value_builder = property_builder
.field_builder::<StringBuilder>(1)
.with_context(|| "getting value field builder")?;
value_builder.append_value(prop.value_str());
property_builder.append(true);
Ok(())
})?;
self.properties.append(true);
Ok(())
}

Expand All @@ -125,6 +170,7 @@ impl LogEntriesRecordBuilder {
Arc::new(self.targets.finish()),
Arc::new(self.levels.finish()),
Arc::new(self.msgs.finish()),
Arc::new(self.properties.finish()),
],
)
.with_context(|| "building record batch")
Expand Down
10 changes: 9 additions & 1 deletion rust/analytics/src/log_entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
payload::{fetch_block_payload, parse_block},
property_set::PropertySet,
time::ConvertTicks,
};
use anyhow::{Context, Result};
Expand All @@ -10,12 +11,14 @@ use micromegas_tracing::prelude::*;
use micromegas_transit::value::{Object, Value};
use std::sync::Arc;

#[derive(Debug)]
pub struct LogEntry {
pub process: Arc<ProcessInfo>,
pub time: i64,
pub level: i32,
pub target: Arc<String>,
pub msg: Arc<String>,
pub properties: PropertySet,
}

#[span_fn]
Expand Down Expand Up @@ -48,6 +51,7 @@ pub fn log_entry_from_value(
level: level as i32,
target,
msg,
properties: PropertySet::empty(),
}))
}
"LogStringEvent" | "LogStringEventV2" => {
Expand All @@ -72,6 +76,7 @@ pub fn log_entry_from_value(
level: level as i32,
target,
msg,
properties: PropertySet::empty(),
}))
}
"LogStaticStrInteropEvent" | "LogStringInteropEventV2" | "LogStringInteropEventV3" => {
Expand All @@ -93,6 +98,7 @@ pub fn log_entry_from_value(
level: level as i32,
target,
msg,
properties: PropertySet::empty(),
}))
}
"TaggedLogInteropEvent" => {
Expand All @@ -108,7 +114,7 @@ pub fn log_entry_from_value(
let msg = obj
.get::<Arc<String>>("msg")
.with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
let _properties = obj.get::<Arc<Object>>("properties").with_context(|| {
let properties = obj.get::<Arc<Object>>("properties").with_context(|| {
format!("reading properties from {}", obj.type_name.as_str())
})?;
let time = convert_ticks.ticks_to_nanoseconds(ticks);
Expand All @@ -118,6 +124,7 @@ pub fn log_entry_from_value(
level: level as i32,
target,
msg,
properties: properties.into(),
}))
}
"TaggedLogString" => {
Expand Down Expand Up @@ -157,6 +164,7 @@ pub fn log_entry_from_value(
level: level as i32,
target,
msg,
properties: properties.into(),
}))
}

Expand Down
Loading

0 comments on commit b2f4c35

Please sign in to comment.