-
-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(eap): Start decoupling EAP entities at the entity layer #6701
Changes from all commits
a715aa7
6ee97f3
706ac91
21fb74f
626d79f
b632070
8626469
45e6547
005de34
5eb4974
6461c67
4310368
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,20 +48,6 @@ storages: | |
from_col_name: exclusive_time_ms | ||
to_col_name: exclusive_time_micro | ||
|
||
subscriptables: | ||
- mapper: SubscriptableHashBucketMapper | ||
args: | ||
from_column_table: null | ||
from_column_name: attr_str | ||
to_col_table: null | ||
to_col_name: attr_str | ||
- mapper: SubscriptableHashBucketMapper | ||
args: | ||
from_column_table: null | ||
from_column_name: attr_num | ||
to_col_table: null | ||
to_col_name: attr_num | ||
|
||
storage_selector: | ||
selector: DefaultQueryStorageSelector | ||
|
||
|
@@ -88,11 +74,16 @@ query_processors: | |
curried_aggregation_names: | ||
- quantile | ||
- quantileTDigestWeighted | ||
- processor: HashBucketFunctionTransformer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a really annoying order of operations, where query processors needed to know what bucket things would end up in, but that was done at the storage level. By merging all of the processors which need to know the actual bucket-level information to a single one at the end, the pipeline is a lot more understandable and has less chance for bugs |
||
- processor: EAPMapSharder | ||
args: | ||
hash_bucket_names: | ||
- attr_str | ||
- attr_num | ||
src_bucket_name: attr_str | ||
dest_bucket_name: attr_str | ||
data_type: String | ||
- processor: EAPMapSharder | ||
args: | ||
src_bucket_name: attr_num | ||
dest_bucket_name: attr_num | ||
data_type: Float64 | ||
|
||
validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema | ||
validators: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
version: v1 | ||
kind: entity | ||
name: eap_spans_rpc | ||
|
||
schema: | ||
[ | ||
{ name: service, type: String }, | ||
{ name: span_id, type: UInt, args: { size: 64 } }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have some sort of a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about uint128? String would have a big performance hit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add that later, would require another net-new processor |
||
{ name: parent_span_id, type: UInt, args: { size: 64 } }, | ||
{ name: segment_id, type: UInt, args: { size: 64 } }, | ||
{ name: segment_name, type: String }, | ||
{ name: is_segment, type: UInt, args: { size: 8 } }, | ||
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } }, | ||
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } }, | ||
{ name: duration_ms, type: Float, args: { size: 64 } }, | ||
{ name: exclusive_time_ms, type: Float, args: { size: 64 } }, | ||
{ name: name, type: String }, | ||
|
||
# these are the required columns for an 'RPC entity' that can be used by EAP RPCs | ||
{ name: trace_id, type: UUID }, | ||
{ name: organization_id, type: UInt, args: { size: 64 } }, | ||
{ name: project_id, type: UInt, args: { size: 64 } }, | ||
{ name: time, type: DateTime }, # used by TimeSeriesProcessor | ||
{ name: timestamp, type: DateTime }, # mapped to _sort_timestamp | ||
colin-sentry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ name: retention_days, type: UInt, args: { size: 16 } }, | ||
{ name: sampling_factor, type: Float, args: { size: 64 } }, | ||
{ name: sampling_weight, type: UInt, args: { size: 64 } }, | ||
{ name: sign, type: Int, args: { size: 8 } }, | ||
colin-sentry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } }, | ||
{ name: attr_f64, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } }, | ||
{ name: attr_i64, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, | ||
colin-sentry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
] | ||
|
||
storages: | ||
- storage: eap_spans | ||
is_writable: true | ||
translation_mappers: | ||
columns: | ||
- mapper: ColumnToColumn | ||
args: | ||
from_table_name: null | ||
from_col_name: timestamp | ||
to_table_name: null | ||
to_col_name: _sort_timestamp | ||
|
||
storage_selector: | ||
selector: DefaultQueryStorageSelector | ||
|
||
query_processors: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I rewrote a lot of this, there were starting to be conflicting edge cases where, e.g.,
|
||
# a processor that creates a virtual 'time' column to group by for generating timeseries. | ||
- processor: TimeSeriesProcessor | ||
args: | ||
time_group_columns: | ||
time: timestamp | ||
time_parse_columns: | ||
- start_timestamp | ||
- end_timestamp | ||
# maps (e.g.) attr_str[sentry.name] to the clickhouse column 'name' | ||
- processor: EAPClickhouseColumnRemapper | ||
args: | ||
hash_bucket_name: attr_str | ||
data_type: String | ||
keys: | ||
sentry.name: name | ||
sentry.service: service | ||
sentry.parent_span_id: parent_span_id | ||
sentry.segment_name: segment_name | ||
sentry.start_timestamp: start_timestamp | ||
sentry.end_timestamp: end_timestamp | ||
sentry.timestamp: timestamp | ||
# maps attr_str[span_id] to hex(span_id) | ||
- processor: EAPClickhouseColumnRemapper | ||
args: | ||
hash_bucket_name: attr_str | ||
data_type: hex | ||
keys: | ||
sentry.span_id: span_id | ||
sentry.segment_id: segment_id | ||
# maps attr_f64[duration_ms] to CAST(duration_ms, Float64) | ||
- processor: EAPClickhouseColumnRemapper | ||
args: | ||
hash_bucket_name: attr_f64 | ||
data_type: Float64 | ||
keys: | ||
sentry.exclusive_time_ms: exclusive_time_ms | ||
sentry.duration_ms: duration_ms | ||
# maps attr_i64[project_id] to CAST(project_id, Int64) | ||
- processor: EAPClickhouseColumnRemapper | ||
args: | ||
hash_bucket_name: attr_i64 | ||
data_type: Int64 | ||
keys: | ||
sentry.exclusive_time_micro: exclusive_time_micro | ||
sentry.duration_micro: duration_micro | ||
sentry.organization_id: organization_id | ||
sentry.project_id: project_id | ||
# maps avg(attr_i64[hello]) to avgIf(attr_i64['hello'], mapContains(attr_i64, 'hello')) | ||
- processor: OptionalAttributeAggregationTransformer | ||
args: | ||
attribute_column_names: | ||
- attr_f64 | ||
- attr_i64 | ||
aggregation_names: | ||
- count | ||
- avg | ||
- avgWeighted | ||
- max | ||
- min | ||
- uniq | ||
curried_aggregation_names: | ||
- quantile | ||
- quantileTDigestWeighted | ||
# maps a few things: | ||
# - attr_i64['blah'] to CAST(arrayIndex(attr_num_5, 'blah'), 'Int64') | ||
# - mapContains(attr_str, blah) to mapContains(attr_str_5, blah) | ||
# - mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...) | ||
# - mapValues(attr_str) to arrayConcat(mapValues(attr_str_0), mapValues(attr_str_1), ...) | ||
- processor: EAPMapSharder | ||
args: | ||
src_bucket_name: attr_str | ||
dest_bucket_name: attr_str | ||
data_type: String | ||
- processor: EAPMapSharder | ||
args: | ||
src_bucket_name: attr_f64 | ||
dest_bucket_name: attr_num | ||
data_type: Float64 | ||
- processor: EAPMapSharder | ||
args: | ||
src_bucket_name: attr_i64 | ||
dest_bucket_name: attr_num | ||
data_type: Int64 | ||
|
||
validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema | ||
validators: | ||
- validator: EntityRequiredColumnValidator | ||
args: | ||
required_filter_columns: [organization_id] | ||
|
||
required_time_column: timestamp |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from typing import Mapping | ||
|
||
from snuba.query.dsl import column, literal | ||
from snuba.query.expressions import ( | ||
Expression, | ||
FunctionCall, | ||
Literal, | ||
SubscriptableReference, | ||
) | ||
from snuba.query.logical import Query | ||
from snuba.query.processors.logical import LogicalQueryProcessor | ||
from snuba.query.query_settings import QuerySettings | ||
|
||
|
||
class EAPClickhouseColumnRemapper(LogicalQueryProcessor): | ||
""" | ||
In EAP entities, all attributes are hidden behind some virtual maps: attr_str, attr_i64, etc | ||
|
||
Sometimes a map access should refer to a 'real' column. | ||
For example, you can use this processor to convert | ||
attr_i64[duration_ms] to CAST(duration_ms, 'Int64') | ||
|
||
If data_type is the special value 'hex', the result is converted with the 'hex' function instead. | ||
|
||
If there is no matching column, the map access remains as-is: | ||
attr_str[derp] remains attr_str[derp] | ||
""" | ||
|
||
def __init__(self, hash_bucket_name: str, keys: Mapping[str, str], data_type: str): | ||
super().__init__() | ||
self.hash_bucket_name = hash_bucket_name | ||
self.keys = keys | ||
self.data_type = data_type | ||
|
||
def process_query(self, query: Query, query_settings: QuerySettings) -> None: | ||
def transform(exp: Expression) -> Expression: | ||
if not isinstance(exp, SubscriptableReference): | ||
return exp | ||
|
||
if exp.column.column_name != self.hash_bucket_name: | ||
return exp | ||
|
||
if not isinstance(exp.key, Literal) or not isinstance(exp.key.value, str): | ||
return exp | ||
|
||
if exp.key.value not in self.keys: | ||
return exp | ||
|
||
if self.data_type == "hex": | ||
return FunctionCall( | ||
alias=exp.alias, | ||
function_name="hex", | ||
parameters=(column(self.keys[exp.key.value]),), | ||
) | ||
return FunctionCall( | ||
alias=exp.alias, | ||
function_name="CAST", | ||
parameters=( | ||
column(self.keys[exp.key.value]), | ||
literal(self.data_type), | ||
), | ||
) | ||
|
||
query.transform_expressions(transform) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets moved into a query processor (the same one which handles mapKeys and mapContains)