Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support for search streaming #2971

Merged
merged 26 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* [ENHANCEMENT] Add `target_info_excluded_dimensions` to user-config api [#2945](https://github.com/grafana/tempo/pull/2945) (@ie-pham)
* [ENHANCEMENT] User-configurable overrides: add scope query parameter to return merged overrides for tenant [#2915](https://github.com/grafana/tempo/pull/2915) (@kvrhdn)
* [ENHANCEMENT] Add histogram buckets to metrics-generator config in user-configurable overrides [#2928](https://github.com/grafana/tempo/pull/2928) (@mar4uk)
* [ENHANCEMENT] Adds websocket support for search streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
**Breaking Change** Deprecated GRPC streaming
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix rare deadlock when uploading blocks to Azure Blob Storage [#2129](https://github.com/grafana/tempo/issues/2129) (@LasseHels)
* [BUGFIX] Only search ingester blocks that fall within the request time range. [#2783](https://github.com/grafana/tempo/pull/2783) (@joe-elliott)
Expand All @@ -41,6 +43,7 @@
sum(rate(tempo_query_frontend_queries_total{}[1m])) by (op)
```
**BREAKING CHANGE** Removed: tempo_query_frontend_queries_total{op="searchtags|metrics"}.
* [BUGFIX] Respect spss on GRPC streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
* [CHANGE] Overrides module refactor [#2688](https://github.com/grafana/tempo/pull/2688) (@mapno)
Added new `defaults` block to the overrides' module. Overrides change to indented syntax.
Old config:
Expand Down
115 changes: 103 additions & 12 deletions cmd/tempo-cli/cmd-query-search.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"path"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
)

type querySearchCmd struct {
APIEndpoint string `arg:"" help:"tempo api endpoint"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`
HostPort string `arg:"" help:"tempo host and port. scheme and path will be provided based on query type. e.g. localhost:3200"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`

OrgID string `help:"optional orgID"`
OrgID string `help:"optional orgID"`
UseGRPC bool `help:"stream search results over GRPC"`
UseWS bool `help:"stream search results over websocket"`
SPSS int `help:"spans per spanset" default:"0"`
Limit int `help:"limit number of results" default:"0"`
PathPrefix string `help:"string to prefix all http paths with"`
}

func (cmd *querySearchCmd) Run(_ *globalOptions) error {
Expand All @@ -35,23 +47,38 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
end := endDate.Unix()

req := &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
SpansPerSpanSet: uint32(cmd.SPSS),
Limit: uint32(cmd.Limit),
}

if cmd.UseGRPC {
return cmd.searchGRPC(req)
} else if cmd.UseWS {
return cmd.searchWS(req)
}

return cmd.searchHTTP(req)
}

func (cmd *querySearchCmd) searchGRPC(req *tempopb.SearchRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err = user.InjectIntoGRPCRequest(ctx)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}
clientConn, err := grpc.DialContext(ctx, cmd.APIEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))

clientConn, err := grpc.DialContext(ctx, cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

client := tempopb.NewStreamingQuerierClient(clientConn)

resp, err := client.Search(ctx, &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
})
resp, err := client.Search(ctx, req)
if err != nil {
return err
}
Expand All @@ -72,3 +99,67 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
}
}

func (cmd *querySearchCmd) searchWS(req *tempopb.SearchRequest) error {
client := httpclient.New("ws://"+path.Join(cmd.HostPort, cmd.PathPrefix), cmd.OrgID)

resp, err := client.SearchWithWebsocket(req, func(resp *tempopb.SearchResponse) {
fmt.Println("--- streaming response ---")
err := printAsJSON(resp)
if err != nil {
panic(err)
}
})
if err != nil {
return err
}

fmt.Println("--- final response ---")
return printAsJSON(resp)
}

func (cmd *querySearchCmd) searchHTTP(req *tempopb.SearchRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathSearch), nil)
if err != nil {
return err
}

httpReq, err = api.BuildSearchRequest(httpReq, req)
if err != nil {
return err
}

httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
return err
}

fmt.Println(httpReq)
httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()

body, err := io.ReadAll(httpResp.Body)
if err != nil {
return err
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query: " + string(body))
}

resp := &tempopb.SearchResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(body), resp)
if err != nil {
panic("failed to parse resp: " + err.Error())
}
err = printAsJSON(resp)
if err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

traceByIDHandler := middleware.Wrap(queryFrontend.TraceByIDHandler)
searchHandler := middleware.Wrap(queryFrontend.SearchHandler)
searchWSHandler := middleware.Wrap(queryFrontend.SearchWSHandler)
spanMetricsSummaryHandler := middleware.Wrap(queryFrontend.SpanMetricsSummaryHandler)
searchTagsHandler := middleware.Wrap(queryFrontend.SearchTagsHandler)

Expand All @@ -383,6 +384,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http search endpoints
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathWSSearch), searchWSHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), searchTagsHandler)
Expand Down
88 changes: 23 additions & 65 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For externally support GRPC API [see below](#tempo-grpc-api)
| [Ingest traces](#ingest) | Distributor | - | See section for details |
| [Querying traces by id](#query) | Query-frontend | HTTP | `GET /api/traces/<traceID>` |
| [Searching traces](#search) | Query-frontend | HTTP | `GET /api/search?<params>` |
| [Searching traces with streaming results](#search-with-websockets) | Query-frontend | HTTP | `GET /api/search-ws?<params>` |
| [Search tag names](#search-tags) | Query-frontend | HTTP | `GET /api/search/tags` |
| [Search tag names V2](#search-tags-v2) | Query-frontend | HTTP | `GET /api/v2/search/tags` |
| [Search tag values](#search-tag-values) | Query-frontend | HTTP | `GET /api/search/tag/<tag>/values` |
Expand Down Expand Up @@ -250,6 +251,24 @@ $ curl -G -s http://localhost:3200/api/search --data-urlencode 'tags=service.nam
}
```

### Search With Websockets
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved

Tempo supports streaming results returned over a websocket at the `/api/search-ws` endpoint. This endpoint supports all of the same
parameters as [search](#search). For a given query Tempo will return a series of intermediate results, a final result and will then
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
gracefully close the websocket connection.

You can test this endpoint using curl like so:
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
```bash
curl -G -s http://localhost:3200/api/search-ws \
-H 'Connection: Upgrade' \
-H 'Upgrade: websocket' \
-H 'Sec-Websocket-Version: 13' \
-H 'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==' \
--data-urlencode 'q={ status=error }' \
--data-urlencode "start=$(date --date yesterday -u +%s)" \
--data-urlencode "end=$(date -u +%s)"
```

### Search tags

Ingester configuration `complete_block_timeout` affects how long tags are available for search.
Expand Down Expand Up @@ -573,71 +592,10 @@ Exposes the build information in a JSON object. The fields are `version`, `revis

## Tempo GRPC API

**Deprecated**

Tempo uses GRPC to internally communicate with itself, but only has one externally supported client. The query-frontend component implements
the streaming querier interface defined below. [See here](https://github.com/grafana/tempo/blob/main/pkg/tempopb/) for the complete proto definition and generated code.

By default this service is only offered over the GRPC port. However, one can offer this streaming service over the HTTP port as well (which Grafana expects).
To enable the streaming service over the http port for use with Grafana set the following.

> **Note**: Enabling this setting is incompatible with TLS.

```
stream_over_http_enabled: true
```

The below `rpc` call returns only traces that are new or have updated each time `SearchResponse` is returned except for the last response. The
final response sent is guaranteed to have the entire resultset.


```protobuf
service StreamingQuerier {
rpc Search(SearchRequest) returns (stream SearchResponse);
}

message SearchRequest {
map<string, string> Tags = 1
uint32 MinDurationMs = 2;
uint32 MaxDurationMs = 3;
uint32 Limit = 4;
uint32 start = 5;
uint32 end = 6;
string Query = 8;
}

message SearchResponse {
repeated TraceSearchMetadata traces = 1;
SearchMetrics metrics = 2;
}

message TraceSearchMetadata {
string traceID = 1;
string rootServiceName = 2;
string rootTraceName = 3;
uint64 startTimeUnixNano = 4;
uint32 durationMs = 5;
SpanSet spanSet = 6; // deprecated. use SpanSets field below
repeated SpanSet spanSets = 7;
}

message SpanSet {
repeated Span spans = 1;
uint32 matched = 2;
}

message Span {
string spanID = 1;
string name = 2;
uint64 startTimeUnixNano = 3;
uint64 durationNanos = 4;
repeated tempopb.common.v1.KeyValue attributes = 5;
}

message SearchMetrics {
uint32 inspectedTraces = 1;
uint64 inspectedBytes = 2;
uint32 totalBlocks = 3;
uint32 completedJobs = 4;
uint32 totalJobs = 5;
uint64 totalBlockBytes = 6;
}
```
The GRPC endpoint has been deprecated and will be removed in a future version. If you would like to streaming results for your TraceQL searches please use the
[websockets endpoint](#search-with-websockets).
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ require (
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
github.com/hashicorp/consul/api v1.15.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func TestAllInOne(t *testing.T) {
require.NoError(t, err)

util.SearchStreamAndAssertTrace(t, grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())

// test websockets
wsClient := httpclient.New("ws://"+tempo.Endpoint(3200), "")
util.SearchWSStreamAndAssertTrace(t, wsClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
})
}
}
Expand Down
17 changes: 17 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,23 @@ func SearchStreamAndAssertTrace(t *testing.T, client tempopb.StreamingQuerierCli
require.True(t, found)
}

func SearchWSStreamAndAssertTrace(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

attr := tempoUtil.RandomAttrFromTrace(expected)
query := fmt.Sprintf(`{ .%s = "%s"}`, attr.GetKey(), attr.GetValue().GetStringValue())

resp, err := client.SearchWithWebsocket(&tempopb.SearchRequest{
Query: query,
Start: uint32(start),
End: uint32(end),
}, func(sr *tempopb.SearchResponse) {})

require.NoError(t, err)
require.True(t, traceIDInResults(t, info.HexID(), resp))
}

// by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers
// to look in the backend blocks
func SearchAndAssertTraceBackend(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
Expand Down
9 changes: 5 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
type streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, SearchTagsHandler, SpanMetricsSummaryHandler http.Handler
streamingSearch streamingSearchHandler
logger log.Logger
TraceByIDHandler, SearchHandler, SearchTagsHandler, SpanMetricsSummaryHandler, SearchWSHandler http.Handler
streamingSearch streamingSearchHandler
logger log.Logger
}

// New returns a new QueryFrontend
Expand Down Expand Up @@ -70,7 +70,8 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
SearchHandler: newHandler(search, searchSLOPostHook(cfg.Search.SLO), searchSLOPreHook, logger),
SearchTagsHandler: newHandler(searchTags, nil, nil, logger),
SpanMetricsSummaryHandler: newHandler(metrics, nil, nil, logger),
streamingSearch: newSearchStreamingHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
SearchWSHandler: newSearchStreamingWSHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
streamingSearch: newSearchStreamingGRPCHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
logger: logger,
}, nil
}
Expand Down
Loading