Skip to content

Commit

Permalink
[feat] Add time window for GetTrace in span store interface (#6242)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Part of #4150

## Description of the changes

Refactor trace get method in span store interface:
- use struct instead of literal trace id
- add time window in the new struct

## How was this change tested?
- unit test

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: rim99 <zhangxin@outlook.com>
Signed-off-by: Zhang Xin <zxin3306@126.com>
Signed-off-by: Zhang Xin <zhangxin@outlook.com>
Signed-off-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people authored Dec 15, 2024
1 parent d3d70e0 commit d6456fb
Show file tree
Hide file tree
Showing 44 changed files with 408 additions and 218 deletions.
8 changes: 5 additions & 3 deletions cmd/anonymizer/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (q *Query) QueryTrace(traceID string) ([]model.Span, error) {
if err != nil {
return nil, fmt.Errorf("failed to convert the provided trace id: %w", err)
}

stream, err := q.client.GetTrace(context.Background(), &api_v2.GetTraceRequest{
// TODO: add start time & end time
request := api_v2.GetTraceRequest{
TraceID: mTraceID,
})
}

stream, err := q.client.GetTrace(context.Background(), &request)
if err != nil {
return nil, unwrapNotFoundErr(err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters")

mockInvalidTraceID = "xyz"
mockTraceID = model.NewTraceID(0, 123456)
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestQueryTrace(t *testing.T) {
defer q.Close()

t.Run("No error", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
mockTraceGRPC, nil).Once()

spans, err := q.QueryTrace(mockTraceID.String())
Expand All @@ -122,7 +122,7 @@ func TestQueryTrace(t *testing.T) {
})

t.Run("Trace not found", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
s.spanReader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
nil, spanstore.ErrTraceNotFound).Once()

spans, err := q.QueryTrace(mockTraceID.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type mockStorageExt struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestExporter(t *testing.T) {
spanReader, err := storageFactory.CreateSpanReader()
require.NoError(t, err)
requiredTraceID := model.NewTraceID(0, 1) // 00000000000000000000000000000001
requiredTrace, err := spanReader.GetTrace(ctx, requiredTraceID)
requiredTrace, err := spanReader.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: requiredTraceID})
require.NoError(t, err)
assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String())

Expand Down
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/integration/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func unwrapNotFoundErr(err error) error {
return err
}

func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
func (r *spanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) {
stream, err := r.client.GetTrace(ctx, &api_v2.GetTraceRequest{
TraceID: traceID,
TraceID: query.TraceID,
StartTime: query.StartTime,
EndTime: query.EndTime,
})
if err != nil {
return nil, unwrapNotFoundErr(err)
Expand Down
15 changes: 8 additions & 7 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func makeTestTrace() (*model.Trace, model.TraceID) {
func makeTestTrace() (*model.Trace, spanstore.GetTraceParameters) {
traceID := model.NewTraceID(150, 160)
query := spanstore.GetTraceParameters{TraceID: traceID}
return &model.Trace{
Spans: []*model.Span{
{
Expand All @@ -94,7 +95,7 @@ func makeTestTrace() (*model.Trace, model.TraceID) {
},
},
},
}, traceID
}, query
}

func runGatewayTests(
Expand Down Expand Up @@ -140,18 +141,18 @@ func (gw *testGateway) runGatewayGetOperations(t *testing.T) {
}

func (gw *testGateway) runGatewayGetTrace(t *testing.T) {
trace, traceID := makeTestTrace()
gw.reader.On("GetTrace", matchContext, traceID).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+traceID.String(), traceID)
trace, query := makeTestTrace()
gw.reader.On("GetTrace", matchContext, query).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+query.TraceID.String(), query.TraceID)
}

func (gw *testGateway) runGatewayFindTraces(t *testing.T) {
trace, traceID := makeTestTrace()
trace, query := makeTestTrace()
q, qp := mockFindQueries()
gw.reader.
On("FindTraces", matchContext, qp).
Return([]*model.Trace{trace}, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID)
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), query.TraceID)
}

func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID model.TraceID) {
Expand Down
7 changes: 6 additions & 1 deletion cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

trace, err := h.QueryService.GetTrace(stream.Context(), traceID)
query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
Expand Down
78 changes: 54 additions & 24 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters")
)

func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) {
Expand Down Expand Up @@ -79,33 +79,63 @@ func newTestServerClient(t *testing.T) *testServerClient {
}

func TestGetTrace(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
&model.Trace{
Spans: []*model.Span{
{
OperationName: "foobar",
},
traceId, _ := model.TraceIDFromString("156")
testCases := []struct {
name string
expectedQuery spanstore.GetTraceParameters
request api_v3.GetTraceRequest
}{
{
"TestGetTrace",
spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Time{},
EndTime: time.Time{},
},
}, nil).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(),
&api_v3.GetTraceRequest{
TraceId: "156",
api_v3.GetTraceRequest{TraceId: "156"},
},
)
require.NoError(t, err)
recv, err := getTraceStream.Recv()
require.NoError(t, err)
td := recv.ToTraces()
require.EqualValues(t, 1, td.SpanCount())
assert.Equal(t, "foobar",
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
{
"TestGetTraceWithTimeWindow",
spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Unix(1, 2).UTC(),
EndTime: time.Unix(3, 4).UTC(),
},
api_v3.GetTraceRequest{
TraceId: "156",
StartTime: time.Unix(1, 2).UTC(),
EndTime: time.Unix(3, 4).UTC(),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, tc.expectedQuery).Return(
&model.Trace{
Spans: []*model.Span{
{
OperationName: "foobar",
},
},
}, nil).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &tc.request)
require.NoError(t, err)
recv, err := getTraceStream.Recv()
require.NoError(t, err)
td := recv.ToTraces()
require.EqualValues(t, 1, td.SpanCount())
assert.Equal(t, "foobar",
td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
})
}
}

func TestGetTraceStorageError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
nil, errors.New("storage_error")).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &api_v3.GetTraceRequest{
Expand All @@ -119,7 +149,7 @@ func TestGetTraceStorageError(t *testing.T) {

func TestGetTraceTraceIDError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchGetTraceParameters).Return(
&model.Trace{
Spans: []*model.Span{},
}, nil).Once()
Expand Down
10 changes: 8 additions & 2 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
)

const (
paramTraceID = "trace_id" // get trace by ID
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
Expand Down Expand Up @@ -133,7 +135,11 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
trc, err := h.QueryService.GetTrace(r.Context(), traceID)
// TODO: add start time & end time
request := spanstore.GetTraceParameters{
TraceID: traceID,
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestHTTPGatewayGetTraceErrors(t *testing.T) {
// error from span reader
const simErr = "simulated error"
gw.reader.
On("GetTrace", matchContext, matchTraceID).
On("GetTrace", matchContext, matchGetTraceParameters).
Return(nil, errors.New(simErr)).Once()

r, err = http.NewRequest(http.MethodGet, "/api/v3/traces/123", nil)
Expand Down
14 changes: 12 additions & 2 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
if r.TraceID == (model.TraceID{}) {
return errUninitializedTraceID
}
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
trace, err := g.queryService.GetTrace(stream.Context(), query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand All @@ -109,7 +114,12 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
if r.TraceID == (model.TraceID{}) {
return nil, errUninitializedTraceID
}
err := g.queryService.ArchiveTrace(ctx, r.TraceID)
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
err := g.queryService.ArchiveTrace(ctx, query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return nil, status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
Expand Down
Loading

0 comments on commit d6456fb

Please sign in to comment.