Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logs list API, logic update for better perf #5912

Merged
merged 15 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/utils"

"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
Expand Down Expand Up @@ -469,7 +470,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}

type logsListTsRange struct {
Start int64
End int64
}

const HOUR_NANO = int64(3600000000000)

func getLogsListTsRanges(start, end int64) []logsListTsRange {
startNano := utils.GetEpochNanoSecs(start)
endNano := utils.GetEpochNanoSecs(end)
result := []logsListTsRange{}

if endNano-startNano > HOUR_NANO {
bucket := HOUR_NANO
tStartNano := endNano - bucket

complete := false
for {
result = append(result, logsListTsRange{Start: tStartNano, End: endNano})
if complete {
break
}

bucket = bucket * 2
endNano = tStartNano
tStartNano = tStartNano - bucket

// break condition
if tStartNano <= startNano {
complete = true
tStartNano = startNano
}
}
}
return result
}

func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, tsRanges []logsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)

// se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize
}
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
data := []*v3.Row{}

for _, v := range tsRanges {
params.Start = v.Start
params.End = v.End

params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, nil, err
}

// this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
data = append(data, rowList...)
}

// append a filter to the params
if len(data) > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved

if uint64(len(data)) >= pageSize {
break
}
}
res = append(res, &v3.Result{
QueryName: qName,
List: data,
})
return res, nil, nil
}

func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil {
for _, v := range params.CompositeQuery.BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
startEndArr := getLogsListTsRanges(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, keys, startEndArr)
}
}
}
}

queries, err := q.builder.PrepareQueries(params, keys)

Expand Down
45 changes: 45 additions & 0 deletions pkg/query-service/app/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,3 +1055,48 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
}
}
}
func TestLogsListTsRange(t *testing.T) {
startEndData := []struct {
name string
start int64
end int64
res []logsListTsRange
}{
{
name: "testing for less then one hour",
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
res: []logsListTsRange{},
},
{
name: "testing for more than one hour",
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
res: []logsListTsRange{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
},
},
{
name: "testing for 1 day",
start: 1722171576000000000,
end: 1722262800000000000,
res: []logsListTsRange{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
},
},
}

for _, test := range startEndData {
res := getLogsListTsRanges(test.start, test.end)
for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
}
}
}
}
109 changes: 109 additions & 0 deletions pkg/query-service/app/querier/v2/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/utils"

"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
Expand Down Expand Up @@ -478,7 +479,115 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}

type logsListTsRange struct {
Start int64
End int64
}

const HOUR_NANO = int64(3600000000000)

func getLogsListTsRanges(start, end int64) []logsListTsRange {
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
startNano := utils.GetEpochNanoSecs(start)
endNano := utils.GetEpochNanoSecs(end)
result := []logsListTsRange{}

if endNano-startNano > HOUR_NANO {
bucket := HOUR_NANO
tStartNano := endNano - bucket

complete := false
for {
result = append(result, logsListTsRange{Start: tStartNano, End: endNano})
if complete {
break
}

bucket = bucket * 2
endNano = tStartNano
tStartNano = tStartNano - bucket

// break condition
if tStartNano <= startNano {
complete = true
tStartNano = startNano
}
}
}
return result
}

func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey, tsRanges []logsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)

// se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize
}
data := []*v3.Row{}

for _, v := range tsRanges {
params.Start = v.Start
params.End = v.End

params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, nil, err
}

// this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
data = append(data, rowList...)
}

// append a filter to the params
if len(data) > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}

if uint64(len(data)) >= pageSize {
break
}
}
res = append(res, &v3.Result{
QueryName: qName,
List: data,
})
return res, nil, nil
}

func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil {
for _, v := range params.CompositeQuery.BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
startEndArr := getLogsListTsRanges(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, keys, startEndArr)
}
}
}
}

queries, err := q.builder.PrepareQueries(params, keys)

Expand Down
46 changes: 46 additions & 0 deletions pkg/query-service/app/querier/v2/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,3 +1107,49 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
}
}
}

func TestLogsListTsRange(t *testing.T) {
startEndData := []struct {
name string
start int64
end int64
res []logsListTsRange
}{
{
name: "testing for less then one hour",
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
res: []logsListTsRange{},
},
{
name: "testing for more than one hour",
start: 1722255800000000000, // July 29, 2024 5:53:20 PM
end: 1722262800000000000, // July 29, 2024 8:06:40 PM
res: []logsListTsRange{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722255800000000000, 1722259200000000000}, // July 29, 2024 5:53:20 PM - July 29, 2024 6:50:00 PM
},
},
{
name: "testing for 1 day",
start: 1722171576000000000,
end: 1722262800000000000,
res: []logsListTsRange{
{1722259200000000000, 1722262800000000000}, // July 29, 2024 6:50:00 PM - July 29, 2024 7:50:00 PM
{1722252000000000000, 1722259200000000000}, // July 29, 2024 4:50:00 PM - July 29, 2024 6:50:00 PM
{1722237600000000000, 1722252000000000000}, // July 29, 2024 12:50:00 PM - July 29, 2024 4:50:00 PM
{1722208800000000000, 1722237600000000000}, // July 29, 2024 4:50:00 AM - July 29, 2024 12:50:00 PM
{1722171576000000000, 1722208800000000000}, // July 28, 2024 6:29:36 PM - July 29, 2024 4:50:00 AM
},
},
}

for _, test := range startEndData {
res := getLogsListTsRanges(test.start, test.end)
for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
}
}
}
}
Loading