Skip to content

Commit

Permalink
fix: use proper tableName (#5982)
Browse files Browse the repository at this point in the history
* fix: use proper tableName

* fix: remove duplicate code
  • Loading branch information
nityanandagohain authored Sep 16, 2024
1 parent fe0d2a9 commit 481c4e1
Showing 1 changed file with 15 additions and 18 deletions.
33 changes: 15 additions & 18 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ type ClickHouseReader struct {
liveTailRefreshSeconds int
cluster string

useLogsNewSchema bool
logsTableName string
useLogsNewSchema bool
logsTableName string
logsLocalTableName string
}

// NewTraceReader returns a TraceReader for the database
Expand Down Expand Up @@ -202,8 +203,10 @@ func NewReaderFromClickhouseConnection(
}

logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema {
logsTableName = options.primary.LogsTableV2
logsLocalTableName = options.primary.LogsLocalTableV2
}

return &ClickHouseReader{
Expand Down Expand Up @@ -240,6 +243,7 @@ func NewReaderFromClickhouseConnection(
logsResourceTableV2: options.primary.LogsResourceTableV2,
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName,
}
}

Expand Down Expand Up @@ -3268,37 +3272,30 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
resources = removeUnderscoreDuplicateFields(resources)

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}

extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response)
r.extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response)
r.extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response)

return &response, nil
}

func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
for _, field := range *fields {
field.Type = fieldType
// all static fields are assumed to be selected as we don't allow changing them
if isSelectedField(tableStatement, field) {
if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) {
response.Selected = append(response.Selected, field)
} else {
response.Interesting = append(response.Interesting, field)
}
}
}

func isSelectedField(tableStatement string, field model.LogField) bool {
// in case of attributes and resources, if there is a materialized column present then it is selected
// TODO: handle partial change complete eg:- index is removed but materialized column is still present
name := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
return strings.Contains(tableStatement, name)
}

func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.UpdateField) *model.ApiError {
if !field.Selected {
return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
Expand Down Expand Up @@ -4033,7 +4030,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand Down Expand Up @@ -4087,7 +4084,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
Expand Down Expand Up @@ -4174,10 +4171,10 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi

// prepare the query and run
if len(req.SearchText) != 0 {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsTable, filterValueColumnWhere)
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsLocalTableName, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
} else {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsTable)
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsLocalTableName)
rows, err = r.db.Query(ctx, query, req.Limit)
}
} else if len(req.SearchText) != 0 {
Expand Down

0 comments on commit 481c4e1

Please sign in to comment.