Skip to content

Commit

Permalink
feat: support for window based pagination in new trace v4 (#6440)
Browse files Browse the repository at this point in the history
* feat: support for window based pagination in new trace v4

* fix: update pagination logic

* fix: update comment

* fix: substract correct length

* fix: revert changes

* fix: add tests for querier

* fix: rename matcher

* fix: handle offset inmemory for list queries in traces

* fix: correct var name

* fix: add max pagination limit for traces
  • Loading branch information
nityanandagohain authored Nov 15, 2024
1 parent c1478c4 commit 77c5f17
Show file tree
Hide file tree
Showing 7 changed files with 857 additions and 87 deletions.
157 changes: 117 additions & 40 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils"
Expand Down Expand Up @@ -52,7 +53,8 @@ type querier struct {
returnedSeries []*v3.Series
returnedErr error

UseLogsNewSchema bool
UseLogsNewSchema bool
UseTraceNewSchema bool
}

type QuerierOptions struct {
Expand Down Expand Up @@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err
}

func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0)
qName := ""
pageSize := uint64(0)
limit := uint64(0)
offset := uint64(0)

// se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize

// for traces specifically
limit = v.Limit
offset = v.Offset
}
data := []*v3.Row{}

tracesLimit := limit + offset

for _, v := range tsRanges {
params.Start = v.Start
params.End = v.End

params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}

length := uint64(0)
// this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)

// appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil {
errs := []error{err}
errQuriesByName := map[string]error{
name: err,
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
length += uint64(len(rowList))
data = append(data, rowList...)
}
data = append(data, rowList...)
}

// append a filter to the params
if len(data) > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}
if length > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}

if uint64(len(data)) >= pageSize {
break
if uint64(len(data)) >= pageSize {
break
}
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100

//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0

// max limit + offset is 10k for pagination
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
}

params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))

// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
}
tracesLimit = tracesLimit - length

if uint64(len(data)) >= limit {
break
}
}
}
res = append(res, &v3.Result{
Expand All @@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
}

func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
// List query has support for only one query
// we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
if params.CompositeQuery != nil &&
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}

// only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
if len(startEndArr) > 0 {
return q.runLogsListQuery(ctx, params, startEndArr)
}
// TODO(nitya): allow for timestamp asc
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
len(v.OrderBy) == 1 &&
v.OrderBy[0].ColumnName == "timestamp" &&
v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
}
}
}
Expand Down Expand Up @@ -408,13 +485,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
close(ch)

var errs []error
errQuriesByName := make(map[string]error)
errQueriesByName := make(map[string]error)
res := make([]*v3.Result, 0)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Err
errQueriesByName[r.Name] = r.Err
continue
}
res = append(res, &v3.Result{
Expand All @@ -423,7 +500,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
})
}
if len(errs) != 0 {
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
return res, nil, nil
}
Expand Down
Loading

0 comments on commit 77c5f17

Please sign in to comment.