Skip to content

Commit

Permalink
Integrate V4 QB (#5914)
Browse files Browse the repository at this point in the history
* feat: logsV4 initial refactoring

* feat: filter_query builder with tests added

* feat: all functions of v4 refactored

* fix: tests fixed

* feat: logs list API, logic update for better perf

* feat: integrate v4 qb

* fix: pass flag

* fix: update select for table panel

* fix: tests updated with better examples of limit and group by

* fix: resource filter support in live tail

* fix: v4 livetail api

* fix: changes for casting pointer value

* fix: reader options updated

* feat: cleanup and use flag

* feat: restrict new list api to single query

* fix: move getTsRanges to utils

* fix: address pr comments
  • Loading branch information
nityanandagohain authored Sep 13, 2024
1 parent a5f3a18 commit 011b216
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 69 deletions.
3 changes: 1 addition & 2 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type APIHandlerOptions struct {
Cache cache.Cache
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration

FluxInterval time.Duration
UseLogsNewSchema bool
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON

defaultLogsLocalTableV2 string = "logs_v2"
defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
)

// NamespaceConfig is Clickhouse's internal configuration data
Expand Down Expand Up @@ -72,6 +77,11 @@ type namespaceConfig struct {
WriteBatchSize int
Encoding Encoding
Connector Connector

LogsLocalTableV2 string
LogsTableV2 string
LogsResourceLocalTableV2 string
LogsResourceTableV2 string
}

// Connecto defines how to connect to the database
Expand Down Expand Up @@ -159,6 +169,11 @@ func NewOptions(
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,

LogsTableV2: defaultLogsTableV2,
LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}
Expand Down
168 changes: 157 additions & 11 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
maxProgressiveSteps = 4
charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
NANOSECOND = 1000000000
)

var (
Expand Down Expand Up @@ -125,6 +126,11 @@ type ClickHouseReader struct {
fanoutStorage *storage.Storage
queryProgressTracker queryprogress.QueryProgressTracker

logsTableV2 string
logsLocalTableV2 string
logsResourceTableV2 string
logsResourceLocalTableV2 string

promConfigFile string
promConfig *config.Config
alertManager am.Manager
Expand All @@ -134,6 +140,7 @@ type ClickHouseReader struct {
cluster string

useLogsNewSchema bool
logsTableName string
}

// NewTraceReader returns a TraceReader for the database
Expand Down Expand Up @@ -197,6 +204,11 @@ func NewReaderFromClickhouseConnection(
},
}

logsTableName := options.primary.LogsTable
if useLogsNewSchema {
logsTableName = options.primary.LogsTableV2
}

return &ClickHouseReader{
db: wrap,
localDB: localDB,
Expand All @@ -223,7 +235,14 @@ func NewReaderFromClickhouseConnection(
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,

useLogsNewSchema: useLogsNewSchema,

logsTableV2: options.primary.LogsTableV2,
logsLocalTableV2: options.primary.LogsLocalTableV2,
logsResourceTableV2: options.primary.LogsResourceTableV2,
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName,
}
}

Expand Down Expand Up @@ -3518,7 +3537,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
resources = removeUnderscoreDuplicateFields(resources)

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
Expand Down Expand Up @@ -3549,17 +3568,87 @@ func isSelectedField(tableStatement string, field model.LogField) bool {
return strings.Contains(tableStatement, name)
}

func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.UpdateField) *model.ApiError {
if !field.Selected {
return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
}

colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name)

dataType := strings.ToLower(field.DataType)
if dataType == "int64" || dataType == "float64" {
dataType = "number"
}
attrColName := fmt.Sprintf("%s_%s", field.Type, dataType)
for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} {
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
query := fmt.Sprintf(q,
r.logsDB, table,
r.cluster,
colname, field.DataType,
attrColName,
field.Name,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}

query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.logsDB, table,
r.cluster,
colname,
attrColName,
field.Name,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}

// create the index
if strings.ToLower(field.DataType) == "bool" {
// there is no point in creating index for bool attributes as the cardinality is just 2
return nil
}

if field.IndexType == "" {
field.IndexType = constants.DefaultLogSkipIndexType
}
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d",
r.logsDB, r.logsLocalTableV2,
r.cluster,
colname,
colname,
field.IndexType,
field.IndexGranularity,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
return nil
}

func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// don't allow updating static fields
if field.Type == constants.Static {
err := errors.New("cannot update static fields")
return &model.ApiError{Err: err, Typ: model.ErrorBadData}
}

colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
if r.useLogsNewSchema {
return r.UpdateLogFieldV2(ctx, field)
}

// if a field is selected it means that the field needs to be indexed
if field.Selected {
colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)

keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType))
valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType))

Expand Down Expand Up @@ -4150,10 +4239,14 @@ func (r *ClickHouseReader) GetLatestReceivedMetric(
return result, nil
}

func isColumn(tableStatement, attrType, field, datType string) bool {
func isColumn(useLogsNewSchema bool, tableStatement, attrType, field, datType string) bool {
// value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field)

var name string
if useLogsNewSchema {
name = utils.GetClickhouseColumnNameV2(attrType, datType, field)
} else {
name = utils.GetClickhouseColumnName(attrType, datType, field)
}
return strings.Contains(tableStatement, fmt.Sprintf("%s ", name))
}

Expand Down Expand Up @@ -4209,7 +4302,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand All @@ -4226,7 +4319,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(attType),
IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType),
IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, attType, tagKey, dataType),
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
Expand Down Expand Up @@ -4263,7 +4356,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand All @@ -4281,7 +4374,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
Key: attributeKey,
DataType: v3.AttributeKeyDataType(attributeDataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType),
IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, attributeKey, attributeDataType),
}

response.AttributeKeys = append(response.AttributeKeys, key)
Expand Down Expand Up @@ -4315,7 +4408,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}

// ignore autocomplete request for body
if req.FilterAttributeKey == "body" {
if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" {
return &v3.FilterAttributeValueResponse{}, nil
}

Expand Down Expand Up @@ -5262,6 +5355,59 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string
return response, nil
}

func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) {
if timestampStart == 0 {
timestampStart = uint64(time.Now().UnixNano())
} else {
timestampStart = uint64(utils.GetEpochNanoSecs(int64(timestampStart)))
}

ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.L().Debug("closing go routine : " + client.Name)
return
case <-ticker.C:
// get the new 100 logs as anything more older won't make sense
var tmpQuery string
bucketStart := (timestampStart / NANOSECOND) - 1800

// we have to form the query differently if the resource filters are used
if strings.Contains(query, r.logsResourceTableV2) {
tmpQuery = fmt.Sprintf("seen_at_ts_bucket_start >=%d)) AND ts_bucket_start >=%d AND timestamp >=%d", bucketStart, bucketStart, timestampStart)
} else {
tmpQuery = fmt.Sprintf("ts_bucket_start >=%d AND timestamp >=%d", bucketStart, timestampStart)
}
if idStart != "" {
tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart)
}

// the reason we are doing desc is that we need the latest logs first
tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100"

// using the old structure since we can directly read it to the struct as use it.
response := []model.SignozLogV2{}
err := r.db.Select(ctx, &response, tmpQuery)
if err != nil {
zap.L().Error("Error while getting logs", zap.Error(err))
client.Error <- err
return
}
for i := len(response) - 1; i >= 0; i-- {
client.Logs <- &response[i]
if i == 0 {
timestampStart = response[i].Timestamp
idStart = response[i].ID
}
}
}
}
}

func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) {
if timestampStart == 0 {
timestampStart = uint64(time.Now().UnixNano())
Expand Down
Loading

0 comments on commit 011b216

Please sign in to comment.