Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate V4 QB #5914

Merged
merged 27 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
be0148d
feat: logsV4 initial refactoring
nityanandagohain Sep 9, 2024
065111c
feat: filter_query builder with tests added
nityanandagohain Sep 9, 2024
934155b
feat: all functions of v4 refactored
nityanandagohain Sep 10, 2024
dc66eb1
fix: tests fixed
nityanandagohain Sep 10, 2024
2b75ed9
feat: logs list API, logic update for better perf
nityanandagohain Sep 10, 2024
2c96364
feat: integrate v4 qb
nityanandagohain Sep 10, 2024
14c0c62
fix: pass flag
nityanandagohain Sep 10, 2024
da26ea5
Merge remote-tracking branch 'origin/develop' into feat/logs-v4-qb-re…
nityanandagohain Sep 10, 2024
6d60c97
Merge branch 'feat/logs-v4-qb-refactor' into feat/v4-logs-list
nityanandagohain Sep 10, 2024
2635d42
Merge branch 'feat/v4-logs-list' into feat/integrate-v4-qb
nityanandagohain Sep 10, 2024
ff0ab8b
fix: update select for table panel
nityanandagohain Sep 10, 2024
db56204
fix: tests updated with better examples of limit and group by
nityanandagohain Sep 10, 2024
34c2c7b
fix: resource filter support in live tail
nityanandagohain Sep 10, 2024
1bf3478
Merge branch 'feat/logs-v4-qb-refactor' into feat/v4-logs-list
nityanandagohain Sep 10, 2024
2322deb
Merge branch 'feat/v4-logs-list' into feat/integrate-v4-qb
nityanandagohain Sep 10, 2024
fc08bd9
fix: v4 livetail api
nityanandagohain Sep 10, 2024
4574f76
fix: changes for casting pointer value
nityanandagohain Sep 11, 2024
8fd3ba5
fix: reader options updated
nityanandagohain Sep 11, 2024
1460f80
Merge remote-tracking branch 'origin/develop' into feat/v4-logs-list
nityanandagohain Sep 12, 2024
47bda32
feat: cleanup and use flag
nityanandagohain Sep 12, 2024
41513bc
Merge remote-tracking branch 'origin/feat/v4-logs-list' into feat/int…
nityanandagohain Sep 12, 2024
803d480
feat: restrict new list api to single query
nityanandagohain Sep 12, 2024
9beb529
fix: move getTsRanges to utils
nityanandagohain Sep 12, 2024
7474d52
Merge remote-tracking branch 'origin/feat/v4-logs-list' into feat/int…
nityanandagohain Sep 12, 2024
f7d3818
Merge remote-tracking branch 'origin/develop' into feat/integrate-v4-qb
nityanandagohain Sep 12, 2024
077c772
fix: address pr comments
nityanandagohain Sep 12, 2024
953d019
Merge remote-tracking branch 'origin/develop' into feat/integrate-v4-qb
nityanandagohain Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type APIHandlerOptions struct {
Cache cache.Cache
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration

FluxInterval time.Duration
UseLogsNewSchema bool
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON

defaultLogsLocalTableV2 string = "logs_v2"
defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
)

// NamespaceConfig is Clickhouse's internal configuration data
Expand Down Expand Up @@ -72,6 +77,11 @@ type namespaceConfig struct {
WriteBatchSize int
Encoding Encoding
Connector Connector

LogsLocalTableV2 string
LogsTableV2 string
LogsResourceLocalTableV2 string
LogsResourceTableV2 string
}

// Connecto defines how to connect to the database
Expand Down Expand Up @@ -159,6 +169,11 @@ func NewOptions(
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,

LogsTableV2: defaultLogsTableV2,
LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}
Expand Down
168 changes: 157 additions & 11 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
maxProgressiveSteps = 4
charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
NANOSECOND = 1000000000
)

var (
Expand Down Expand Up @@ -125,6 +126,11 @@ type ClickHouseReader struct {
fanoutStorage *storage.Storage
queryProgressTracker queryprogress.QueryProgressTracker

logsTableV2 string
logsLocalTableV2 string
logsResourceTableV2 string
logsResourceLocalTableV2 string

promConfigFile string
promConfig *config.Config
alertManager am.Manager
Expand All @@ -134,6 +140,7 @@ type ClickHouseReader struct {
cluster string

useLogsNewSchema bool
logsTableName string
}

// NewTraceReader returns a TraceReader for the database
Expand Down Expand Up @@ -197,6 +204,11 @@ func NewReaderFromClickhouseConnection(
},
}

logsTableName := options.primary.LogsTable
if useLogsNewSchema {
logsTableName = options.primary.LogsTableV2
}

return &ClickHouseReader{
db: wrap,
localDB: localDB,
Expand All @@ -223,7 +235,14 @@ func NewReaderFromClickhouseConnection(
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,

useLogsNewSchema: useLogsNewSchema,

logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2,
logsResourceTableV2: options.primary.LogsResourceTableV2,
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName,
}
}

Expand Down Expand Up @@ -3518,7 +3537,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
resources = removeUnderscoreDuplicateFields(resources)

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
Expand Down Expand Up @@ -3549,17 +3568,87 @@ func isSelectedField(tableStatement string, field model.LogField) bool {
return strings.Contains(tableStatement, name)
}

func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.UpdateField) *model.ApiError {
if !field.Selected {
return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
}

colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name)

dataType := strings.ToLower(field.DataType)
if dataType == "int64" || dataType == "float64" {
dataType = "number"
}
attrColName := fmt.Sprintf("%s_%s", field.Type, dataType)
for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} {
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
query := fmt.Sprintf(q,
r.logsDB, table,
r.cluster,
colname, field.DataType,
attrColName,
field.Name,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}

query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.logsDB, table,
r.cluster,
colname,
attrColName,
field.Name,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}

// create the index
if strings.ToLower(field.DataType) == "bool" {
// there is no point in creating index for bool attributes as the cardinality is just 2
return nil
}

if field.IndexType == "" {
field.IndexType = constants.DefaultLogSkipIndexType
}
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d",
r.logsDB, r.logsLocalTableV2,
r.cluster,
colname,
colname,
field.IndexType,
field.IndexGranularity,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
return nil
}

func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// don't allow updating static fields
if field.Type == constants.Static {
err := errors.New("cannot update static fields")
return &model.ApiError{Err: err, Typ: model.ErrorBadData}
}

colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
if r.useLogsNewSchema {
return r.UpdateLogFieldV2(ctx, field)
}

// if a field is selected it means that the field needs to be indexed
if field.Selected {
colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)

keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType))
valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType))

Expand Down Expand Up @@ -4150,10 +4239,14 @@ func (r *ClickHouseReader) GetLatestReceivedMetric(
return result, nil
}

func isColumn(tableStatement, attrType, field, datType string) bool {
func isColumn(useLogsNewSchema bool, tableStatement, attrType, field, datType string) bool {
// value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field)

var name string
if useLogsNewSchema {
name = utils.GetClickhouseColumnNameV2(attrType, datType, field)
} else {
name = utils.GetClickhouseColumnName(attrType, datType, field)
}
return strings.Contains(tableStatement, fmt.Sprintf("%s ", name))
}

Expand Down Expand Up @@ -4209,7 +4302,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand All @@ -4226,7 +4319,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(attType),
IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType),
IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, attType, tagKey, dataType),
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
Expand Down Expand Up @@ -4263,7 +4356,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand All @@ -4281,7 +4374,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
Key: attributeKey,
DataType: v3.AttributeKeyDataType(attributeDataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType),
IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, attributeKey, attributeDataType),
}

response.AttributeKeys = append(response.AttributeKeys, key)
Expand Down Expand Up @@ -4315,7 +4408,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}

// ignore autocomplete request for body
if req.FilterAttributeKey == "body" {
if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" {
return &v3.FilterAttributeValueResponse{}, nil
}

Expand Down Expand Up @@ -5262,6 +5355,59 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string
return response, nil
}

func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) {
if timestampStart == 0 {
timestampStart = uint64(time.Now().UnixNano())
} else {
timestampStart = uint64(utils.GetEpochNanoSecs(int64(timestampStart)))
}

ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.L().Debug("closing go routine : " + client.Name)
return
case <-ticker.C:
// get the new 100 logs as anything more older won't make sense
var tmpQuery string
bucketStart := (timestampStart / NANOSECOND) - 1800

// we have to form the query differently if the resource filters are used
if strings.Contains(query, r.logsResourceTableV2) {
tmpQuery = fmt.Sprintf("seen_at_ts_bucket_start >=%d)) AND ts_bucket_start >=%d AND timestamp >=%d", bucketStart, bucketStart, timestampStart)
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
} else {
tmpQuery = fmt.Sprintf("ts_bucket_start >=%d AND timestamp >=%d", bucketStart, timestampStart)
}
if idStart != "" {
tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart)
}

// the reason we are doing desc is that we need the latest logs first
tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100"

// using the old structure since we can directly read it to the struct as use it.
response := []model.SignozLogV2{}
err := r.db.Select(ctx, &response, tmpQuery)
if err != nil {
zap.L().Error("Error while getting logs", zap.Error(err))
client.Error <- err
return
}
for i := len(response) - 1; i >= 0; i-- {
client.Logs <- &response[i]
if i == 0 {
timestampStart = response[i].Timestamp
idStart = response[i].ID
}
}
}
}
}

func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) {
if timestampStart == 0 {
timestampStart = uint64(time.Now().UnixNano())
Expand Down
Loading
Loading