Skip to content

Commit

Permalink
Websocket support for search streaming (#2971)
Browse files Browse the repository at this point in the history
* split out not streaming code

Signed-off-by: Joe Elliott <number101010@gmail.com>

* first pass ws handler

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add tempo-cli clients for grpc, http and http ws

Signed-off-by: Joe Elliott <number101010@gmail.com>

* clean up ws shutdown logic

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added 5s ws ping

Signed-off-by: Joe Elliott <number101010@gmail.com>

* cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added limit and spss support to cli

Signed-off-by: Joe Elliott <number101010@gmail.com>

* clean up cli to work in a real env

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add dep

Signed-off-by: Joe Elliott <number101010@gmail.com>

* e2e test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* docs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Update docs/sources/tempo/api_docs/_index.md

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>

* Update docs/sources/tempo/api_docs/_index.md

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>

* Update docs/sources/tempo/api_docs/_index.md

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>

* review comments

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added cli docs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* prevent incorrect error on graceful closure

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>
  • Loading branch information
joe-elliott and knylander-grafana authored Oct 6, 2023
1 parent 39f52ac commit 7e0cb51
Show file tree
Hide file tree
Showing 37 changed files with 4,130 additions and 175 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* [ENHANCEMENT] Add `target_info_excluded_dimensions` to user-config api [#2945](https://github.com/grafana/tempo/pull/2945) (@ie-pham)
* [ENHANCEMENT] User-configurable overrides: add scope query parameter to return merged overrides for tenant [#2915](https://github.com/grafana/tempo/pull/2915) (@kvrhdn)
* [ENHANCEMENT] Add histogram buckets to metrics-generator config in user-configurable overrides [#2928](https://github.com/grafana/tempo/pull/2928) (@mar4uk)
* [ENHANCEMENT] Adds websocket support for search streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
**Breaking Change** Deprecated GRPC streaming
* [ENHANCEMENT] added a metrics generator config option to enable/disable X-Scope-OrgID headers on remote write. [#2974](https://github.com/grafana/tempo/pull/2974) (@vineetjp)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix rare deadlock when uploading blocks to Azure Blob Storage [#2129](https://github.com/grafana/tempo/issues/2129) (@LasseHels)
Expand All @@ -42,6 +44,7 @@
sum(rate(tempo_query_frontend_queries_total{}[1m])) by (op)
```
**BREAKING CHANGE** Removed: tempo_query_frontend_queries_total{op="searchtags|metrics"}.
* [BUGFIX] Respect spss on GRPC streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
* [CHANGE] Overrides module refactor [#2688](https://github.com/grafana/tempo/pull/2688) (@mapno)
Added new `defaults` block to the overrides' module. Overrides change to indented syntax.
Old config:
Expand Down
115 changes: 103 additions & 12 deletions cmd/tempo-cli/cmd-query-search.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"path"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
)

type querySearchCmd struct {
APIEndpoint string `arg:"" help:"tempo api endpoint"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`
HostPort string `arg:"" help:"tempo host and port. scheme and path will be provided based on query type. e.g. localhost:3200"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`

OrgID string `help:"optional orgID"`
OrgID string `help:"optional orgID"`
UseGRPC bool `help:"stream search results over GRPC"`
UseWS bool `help:"stream search results over websocket"`
SPSS int `help:"spans per spanset" default:"0"`
Limit int `help:"limit number of results" default:"0"`
PathPrefix string `help:"string to prefix all http paths with"`
}

func (cmd *querySearchCmd) Run(_ *globalOptions) error {
Expand All @@ -35,23 +47,38 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
end := endDate.Unix()

req := &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
SpansPerSpanSet: uint32(cmd.SPSS),
Limit: uint32(cmd.Limit),
}

if cmd.UseGRPC {
return cmd.searchGRPC(req)
} else if cmd.UseWS {
return cmd.searchWS(req)
}

return cmd.searchHTTP(req)
}

func (cmd *querySearchCmd) searchGRPC(req *tempopb.SearchRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err = user.InjectIntoGRPCRequest(ctx)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}
clientConn, err := grpc.DialContext(ctx, cmd.APIEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))

clientConn, err := grpc.DialContext(ctx, cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

client := tempopb.NewStreamingQuerierClient(clientConn)

resp, err := client.Search(ctx, &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
})
resp, err := client.Search(ctx, req)
if err != nil {
return err
}
Expand All @@ -72,3 +99,67 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
}
}

func (cmd *querySearchCmd) searchWS(req *tempopb.SearchRequest) error {
client := httpclient.New("ws://"+path.Join(cmd.HostPort, cmd.PathPrefix), cmd.OrgID)

resp, err := client.SearchWithWebsocket(req, func(resp *tempopb.SearchResponse) {
fmt.Println("--- streaming response ---")
err := printAsJSON(resp)
if err != nil {
panic(err)
}
})
if err != nil {
return err
}

fmt.Println("--- final response ---")
return printAsJSON(resp)
}

func (cmd *querySearchCmd) searchHTTP(req *tempopb.SearchRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathSearch), nil)
if err != nil {
return err
}

httpReq, err = api.BuildSearchRequest(httpReq, req)
if err != nil {
return err
}

httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
return err
}

fmt.Println(httpReq)
httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()

body, err := io.ReadAll(httpResp.Body)
if err != nil {
return err
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query: " + string(body))
}

resp := &tempopb.SearchResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(body), resp)
if err != nil {
panic("failed to parse resp: " + err.Error())
}
err = printAsJSON(resp)
if err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

traceByIDHandler := middleware.Wrap(queryFrontend.TraceByIDHandler)
searchHandler := middleware.Wrap(queryFrontend.SearchHandler)
searchWSHandler := middleware.Wrap(queryFrontend.SearchWSHandler)
spanMetricsSummaryHandler := middleware.Wrap(queryFrontend.SpanMetricsSummaryHandler)
searchTagsHandler := middleware.Wrap(queryFrontend.SearchTagsHandler)

Expand All @@ -383,6 +384,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http search endpoints
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathWSSearch), searchWSHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), searchTagsHandler)
Expand Down
88 changes: 23 additions & 65 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For externally support GRPC API [see below](#tempo-grpc-api)
| [Ingest traces](#ingest) | Distributor | - | See section for details |
| [Querying traces by id](#query) | Query-frontend | HTTP | `GET /api/traces/<traceID>` |
| [Searching traces](#search) | Query-frontend | HTTP | `GET /api/search?<params>` |
| [Searching traces with streaming results](#search-with-websockets) | Query-frontend | HTTP | `GET /api/search-ws?<params>` |
| [Search tag names](#search-tags) | Query-frontend | HTTP | `GET /api/search/tags` |
| [Search tag names V2](#search-tags-v2) | Query-frontend | HTTP | `GET /api/v2/search/tags` |
| [Search tag values](#search-tag-values) | Query-frontend | HTTP | `GET /api/search/tag/<tag>/values` |
Expand Down Expand Up @@ -250,6 +251,24 @@ $ curl -G -s http://localhost:3200/api/search --data-urlencode 'tags=service.nam
}
```
### Search with websockets
Tempo supports streaming results returned over a websocket at the `/api/search-ws` endpoint. This endpoint supports all of the same
parameters as [search](#search). For a given query, Tempo will return a series of intermediate results, a final result and will then
gracefully close the websocket connection.
You can test this endpoint using curl:
```bash
curl -G -s http://localhost:3200/api/search-ws \
-H 'Connection: Upgrade' \
-H 'Upgrade: websocket' \
-H 'Sec-Websocket-Version: 13' \
-H 'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==' \
--data-urlencode 'q={ status=error }' \
--data-urlencode "start=$(date --date yesterday -u +%s)" \
--data-urlencode "end=$(date -u +%s)"
```
### Search tags
Ingester configuration `complete_block_timeout` affects how long tags are available for search.
Expand Down Expand Up @@ -573,71 +592,10 @@ Exposes the build information in a JSON object. The fields are `version`, `revis
## Tempo GRPC API
**Deprecated**
Tempo uses GRPC to internally communicate with itself, but only has one externally supported client. The query-frontend component implements
the streaming querier interface defined below. [See here](https://github.com/grafana/tempo/blob/main/pkg/tempopb/) for the complete proto definition and generated code.
By default this service is only offered over the GRPC port. However, one can offer this streaming service over the HTTP port as well (which Grafana expects).
To enable the streaming service over the http port for use with Grafana set the following.
> **Note**: Enabling this setting is incompatible with TLS.
```
stream_over_http_enabled: true
```
The below `rpc` call returns only traces that are new or have updated each time `SearchResponse` is returned except for the last response. The
final response sent is guaranteed to have the entire resultset.
```protobuf
service StreamingQuerier {
rpc Search(SearchRequest) returns (stream SearchResponse);
}
message SearchRequest {
map<string, string> Tags = 1
uint32 MinDurationMs = 2;
uint32 MaxDurationMs = 3;
uint32 Limit = 4;
uint32 start = 5;
uint32 end = 6;
string Query = 8;
}
message SearchResponse {
repeated TraceSearchMetadata traces = 1;
SearchMetrics metrics = 2;
}
message TraceSearchMetadata {
string traceID = 1;
string rootServiceName = 2;
string rootTraceName = 3;
uint64 startTimeUnixNano = 4;
uint32 durationMs = 5;
SpanSet spanSet = 6; // deprecated. use SpanSets field below
repeated SpanSet spanSets = 7;
}
message SpanSet {
repeated Span spans = 1;
uint32 matched = 2;
}
message Span {
string spanID = 1;
string name = 2;
uint64 startTimeUnixNano = 3;
uint64 durationNanos = 4;
repeated tempopb.common.v1.KeyValue attributes = 5;
}
message SearchMetrics {
uint32 inspectedTraces = 1;
uint64 inspectedBytes = 2;
uint32 totalBlocks = 3;
uint32 completedJobs = 4;
uint32 totalJobs = 5;
uint64 totalBlockBytes = 6;
}
```
The GRPC endpoint has been deprecated and will be removed in a future version. If you would like to streaming results for your TraceQL searches please use the
[websockets endpoint](#search-with-websockets).
28 changes: 25 additions & 3 deletions docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ The backend can be configured in a few ways:
Each option applies only to the command in which it is used. For example, `--backend <value>` does not permanently change where Tempo stores data. It only changes it for command in which you apply the option.

## Query API command
Call the tempo API and retrieve a trace by ID.

### Trace ID
Call the Tempo API and retrieve a trace by ID.
```bash
tempo-cli query api <api-endpoint> <trace-id>
tempo-cli query api trace-id <api-endpoint> <trace-id>
```

Arguments:
Expand All @@ -71,8 +73,28 @@ Options:

**Example:**
```bash
tempo-cli query api http://tempo:3200 f1cfe82a8eef933b
tempo-cli query api trace-id http://tempo:3200 f1cfe82a8eef933b
```

### Search
Call the Tempo API and search using TraceQL.

```bash
tempo-cli query api search <host-port> <trace-ql> <start> <end>
```
Arguments:
- `host-port` A host/port combination for Tempo. The scheme will be inferred based on the options provided.
- `trace-ql` TraceQL query.
- `start` Start of the time range to search: (YYYY-MM-DDThh:mm:ss)
- `end` End of the time range to search: (YYYY-MM-DDThh:mm:ss)

Options:
- `--org-id <value>` Organization ID (for use in multi-tenant setup).
- `--use-grpc` Use deprecated GRPC streaming
- `--use-ws` Use HTTP/Websocket streaming
- `--spss <value>` Number of spans to return for each spanset
- `--limit <value>` Number of results to return
- `--path-prefix <value>` String to prefix search paths with

## Query blocks command
Iterate over all backend blocks and dump all data found for a given trace id.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/googleapis/gax-go/v2 v2.10.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.74.0
Expand Down
4 changes: 4 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func TestAllInOne(t *testing.T) {
require.NoError(t, err)

util.SearchStreamAndAssertTrace(t, grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())

// test websockets
wsClient := httpclient.New("ws://"+tempo.Endpoint(3200), "")
util.SearchWSStreamAndAssertTrace(t, wsClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
})
}
}
Expand Down
17 changes: 17 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,23 @@ func SearchStreamAndAssertTrace(t *testing.T, client tempopb.StreamingQuerierCli
require.True(t, found)
}

func SearchWSStreamAndAssertTrace(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

attr := tempoUtil.RandomAttrFromTrace(expected)
query := fmt.Sprintf(`{ .%s = "%s"}`, attr.GetKey(), attr.GetValue().GetStringValue())

resp, err := client.SearchWithWebsocket(&tempopb.SearchRequest{
Query: query,
Start: uint32(start),
End: uint32(end),
}, func(sr *tempopb.SearchResponse) {})

require.NoError(t, err)
require.True(t, traceIDInResults(t, info.HexID(), resp))
}

// by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers
// to look in the backend blocks
func SearchAndAssertTraceBackend(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
Expand Down
Loading

0 comments on commit 7e0cb51

Please sign in to comment.