Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): Start decoupling EAP entities at the entity layer #6701

Closed
wants to merge 12 commits into from
37 changes: 0 additions & 37 deletions snuba/clickhouse/translators/snuba/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
Param,
String,
)
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.hashes import fnv_1a


# This is a workaround for a mypy bug, found here: https://github.com/python/mypy/issues/5374
Expand Down Expand Up @@ -229,41 +227,6 @@ def attempt_map(
return None


@dataclass(frozen=True)
class SubscriptableHashBucketMapper(SubscriptableReferenceMapper):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets moved into a query processor (the same one which handles mapKeys and mapContains)

"""
Maps a key into the appropriate bucket by hashing the key. For example, hello[test] might go to attr_str_22['test']
"""

from_column_table: Optional[str]
from_column_name: str
to_col_table: Optional[str]
to_col_name: str

def attempt_map(
self,
expression: SubscriptableReference,
children_translator: SnubaClickhouseStrictTranslator,
) -> Optional[FunctionCallExpr]:
if (
expression.column.table_name != self.from_column_table
or expression.column.column_name != self.from_column_name
):
return None
key = expression.key.accept(children_translator)
if not isinstance(key, LiteralExpr):
return None
if not isinstance(key.value, str):
return None

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)


@dataclass(frozen=True)
class ColumnToMapping(ColumnToExpression):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,6 @@ storages:
from_col_name: exclusive_time_ms
to_col_name: exclusive_time_micro

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_str
to_col_table: null
to_col_name: attr_str
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_num
to_col_table: null
to_col_name: attr_num

storage_selector:
selector: DefaultQueryStorageSelector

Expand All @@ -88,11 +74,16 @@ query_processors:
curried_aggregation_names:
- quantile
- quantileTDigestWeighted
- processor: HashBucketFunctionTransformer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a really annoying order of operations, where query processors needed to know what bucket things would end up in, but that was done at the storage level.

By merging all of the processors which need to know the actual bucket-level information to a single one at the end, the pipeline is a lot more understandable and has less chance for bugs

- processor: EAPMapSharder
args:
hash_bucket_names:
- attr_str
- attr_num
src_bucket_name: attr_str
dest_bucket_name: attr_str
data_type: String
- processor: EAPMapSharder
args:
src_bucket_name: attr_num
dest_bucket_name: attr_num
data_type: Float64

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
version: v1
kind: entity
name: eap_spans_rpc

schema:
[
{ name: service, type: String },
{ name: span_id, type: UInt, args: { size: 64 } },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have some sort of a event_id, as a string, required in for the RPC. Having it as a string would let us pass any sort of UUIDs or span IDs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about uint128? String would have a big performance hit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add that later, would require another net-new processor

{ name: parent_span_id, type: UInt, args: { size: 64 } },
{ name: segment_id, type: UInt, args: { size: 64 } },
{ name: segment_name, type: String },
{ name: is_segment, type: UInt, args: { size: 8 } },
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: duration_ms, type: Float, args: { size: 64 } },
{ name: exclusive_time_ms, type: Float, args: { size: 64 } },
{ name: name, type: String },

# these are the required columns for an 'RPC entity' that can be used by EAP RPCs
{ name: trace_id, type: UUID },
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: time, type: DateTime }, # used by TimeSeriesProcessor
{ name: timestamp, type: DateTime }, # mapped to _sort_timestamp
colin-sentry marked this conversation as resolved.
Show resolved Hide resolved
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
colin-sentry marked this conversation as resolved.
Show resolved Hide resolved
{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_f64, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } },
{ name: attr_i64, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } },
colin-sentry marked this conversation as resolved.
Show resolved Hide resolved
]

storages:
- storage: eap_spans
is_writable: true
translation_mappers:
columns:
- mapper: ColumnToColumn
args:
from_table_name: null
from_col_name: timestamp
to_table_name: null
to_col_name: _sort_timestamp

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
Copy link
Member Author

@colin-sentry colin-sentry Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote a lot of this, there were starting to be conflicting edge cases where, e.g., sum(attr_f64[sentry.duration_ms]) should become sum(duration_ms), but the very similar

sum(attr_i64[blah]) should become

sumIf(CAST(attr_num_2[blah], 'Integer'), mapContains(attr_num_2, 'blah'))

# a processor that creates a virtual 'time' column to group by for generating timeseries.
- processor: TimeSeriesProcessor
args:
time_group_columns:
time: timestamp
time_parse_columns:
- start_timestamp
- end_timestamp
# maps (e.g.) attr_str[sentry.name] to the clickhouse column 'name'
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_str
data_type: String
keys:
sentry.name: name
sentry.service: service
sentry.parent_span_id: parent_span_id
sentry.segment_name: segment_name
sentry.start_timestamp: start_timestamp
sentry.end_timestamp: end_timestamp
sentry.timestamp: timestamp
# maps attr_str[span_id] to hex(span_id)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_str
data_type: hex
keys:
sentry.span_id: span_id
sentry.segment_id: segment_id
# maps attr_f64[duration_ms] to CAST(duration_ms, Float64)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_f64
data_type: Float64
keys:
sentry.exclusive_time_ms: exclusive_time_ms
sentry.duration_ms: duration_ms
# maps attr_i64[project_id] to CAST(project_id, Int64)
- processor: EAPClickhouseColumnRemapper
args:
hash_bucket_name: attr_i64
data_type: Int64
keys:
sentry.exclusive_time_micro: exclusive_time_micro
sentry.duration_micro: duration_micro
sentry.organization_id: organization_id
sentry.project_id: project_id
# maps avg(attr_i64[hello]) to avgIf(attr_i64['hello'], mapContains(attr_i64, 'hello'))
- processor: OptionalAttributeAggregationTransformer
args:
attribute_column_names:
- attr_f64
- attr_i64
aggregation_names:
- count
- avg
- avgWeighted
- max
- min
- uniq
curried_aggregation_names:
- quantile
- quantileTDigestWeighted
# maps a few things:
# - attr_i64['blah'] to CAST(arrayIndex(attr_num_5, 'blah'), 'Int64')
# - mapContains(attr_str, blah) to mapContains(attr_str_5, blah)
# - mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...)
# - mapValues(attr_str) to arrayConcat(mapValues(attr_str_0), mapValues(attr_str_1), ...)
- processor: EAPMapSharder
args:
src_bucket_name: attr_str
dest_bucket_name: attr_str
data_type: String
- processor: EAPMapSharder
args:
src_bucket_name: attr_f64
dest_bucket_name: attr_num
data_type: Float64
- processor: EAPMapSharder
args:
src_bucket_name: attr_i64
dest_bucket_name: attr_num
data_type: Int64

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
- validator: EntityRequiredColumnValidator
args:
required_filter_columns: [organization_id]

required_time_column: timestamp
64 changes: 64 additions & 0 deletions snuba/query/processors/logical/eap_map_access_remapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Mapping

from snuba.query.dsl import column, literal
from snuba.query.expressions import (
Expression,
FunctionCall,
Literal,
SubscriptableReference,
)
from snuba.query.logical import Query
from snuba.query.processors.logical import LogicalQueryProcessor
from snuba.query.query_settings import QuerySettings


class EAPClickhouseColumnRemapper(LogicalQueryProcessor):
"""
In EAP entities, all attributes are hidden behind some virtual maps: attr_str, attr_i64, etc

Sometimes a map access should refer to a 'real' column.
For example, you can use this processor to convert
attr_i64[duration_ms] to CAST(duration_ms, 'Int64')

If data_type is the special value 'hex', the result is converted with the 'hex' function instead.

If there is no matching column, the map access remains as-is:
attr_str[derp] remains attr_str[derp]
"""

def __init__(self, hash_bucket_name: str, keys: Mapping[str, str], data_type: str):
super().__init__()
self.hash_bucket_name = hash_bucket_name
self.keys = keys
self.data_type = data_type

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform(exp: Expression) -> Expression:
if not isinstance(exp, SubscriptableReference):
return exp

if exp.column.column_name != self.hash_bucket_name:
return exp

if not isinstance(exp.key, Literal) or not isinstance(exp.key.value, str):
return exp

if exp.key.value not in self.keys:
return exp

if self.data_type == "hex":
return FunctionCall(
alias=exp.alias,
function_name="hex",
parameters=(column(self.keys[exp.key.value]),),
)
return FunctionCall(
alias=exp.alias,
function_name="CAST",
parameters=(
column(self.keys[exp.key.value]),
literal(self.data_type),
),
)

query.transform_expressions(transform)
Loading
Loading