Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support for search streaming #2971

Merged
merged 26 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* [ENHANCEMENT] Add `target_info_excluded_dimensions` to user-config api [#2945](https://github.com/grafana/tempo/pull/2945) (@ie-pham)
* [ENHANCEMENT] User-configurable overrides: add scope query parameter to return merged overrides for tenant [#2915](https://github.com/grafana/tempo/pull/2915) (@kvrhdn)
* [ENHANCEMENT] Add histogram buckets to metrics-generator config in user-configurable overrides [#2928](https://github.com/grafana/tempo/pull/2928) (@mar4uk)
* [ENHANCEMENT] Adds websocket support for search streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
**Breaking Change** Deprecated GRPC streaming
* [ENHANCEMENT] added a metrics generator config option to enable/disable X-Scope-OrgID headers on remote write. [#2974](https://github.com/grafana/tempo/pull/2974) (@vineetjp)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix rare deadlock when uploading blocks to Azure Blob Storage [#2129](https://github.com/grafana/tempo/issues/2129) (@LasseHels)
Expand All @@ -42,6 +44,7 @@
sum(rate(tempo_query_frontend_queries_total{}[1m])) by (op)
```
**BREAKING CHANGE** Removed: tempo_query_frontend_queries_total{op="searchtags|metrics"}.
* [BUGFIX] Respect spss on GRPC streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
* [CHANGE] Overrides module refactor [#2688](https://github.com/grafana/tempo/pull/2688) (@mapno)
Added new `defaults` block to the overrides' module. Overrides change to indented syntax.
Old config:
Expand Down
115 changes: 103 additions & 12 deletions cmd/tempo-cli/cmd-query-search.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"path"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
)

type querySearchCmd struct {
APIEndpoint string `arg:"" help:"tempo api endpoint"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`
HostPort string `arg:"" help:"tempo host and port. scheme and path will be provided based on query type. e.g. localhost:3200"`
TraceQL string `arg:"" optional:"" help:"traceql query"`
Start string `arg:"" optional:"" help:"start time in ISO8601 format"`
End string `arg:"" optional:"" help:"end time in ISO8601 format"`

OrgID string `help:"optional orgID"`
OrgID string `help:"optional orgID"`
UseGRPC bool `help:"stream search results over GRPC"`
UseWS bool `help:"stream search results over websocket"`
SPSS int `help:"spans per spanset" default:"0"`
Limit int `help:"limit number of results" default:"0"`
PathPrefix string `help:"string to prefix all http paths with"`
}

func (cmd *querySearchCmd) Run(_ *globalOptions) error {
Expand All @@ -35,23 +47,38 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
end := endDate.Unix()

req := &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
SpansPerSpanSet: uint32(cmd.SPSS),
Limit: uint32(cmd.Limit),
}

if cmd.UseGRPC {
return cmd.searchGRPC(req)
} else if cmd.UseWS {
return cmd.searchWS(req)
}

return cmd.searchHTTP(req)
}

func (cmd *querySearchCmd) searchGRPC(req *tempopb.SearchRequest) error {
ctx := user.InjectOrgID(context.Background(), cmd.OrgID)
ctx, err = user.InjectIntoGRPCRequest(ctx)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
return err
}
clientConn, err := grpc.DialContext(ctx, cmd.APIEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))

clientConn, err := grpc.DialContext(ctx, cmd.HostPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

client := tempopb.NewStreamingQuerierClient(clientConn)

resp, err := client.Search(ctx, &tempopb.SearchRequest{
Query: cmd.TraceQL,
Start: uint32(start),
End: uint32(end),
})
resp, err := client.Search(ctx, req)
if err != nil {
return err
}
Expand All @@ -72,3 +99,67 @@ func (cmd *querySearchCmd) Run(_ *globalOptions) error {
}
}
}

func (cmd *querySearchCmd) searchWS(req *tempopb.SearchRequest) error {
client := httpclient.New("ws://"+path.Join(cmd.HostPort, cmd.PathPrefix), cmd.OrgID)

resp, err := client.SearchWithWebsocket(req, func(resp *tempopb.SearchResponse) {
fmt.Println("--- streaming response ---")
err := printAsJSON(resp)
if err != nil {
panic(err)
}
})
if err != nil {
return err
}

fmt.Println("--- final response ---")
return printAsJSON(resp)
}

func (cmd *querySearchCmd) searchHTTP(req *tempopb.SearchRequest) error {
httpReq, err := http.NewRequest("GET", "http://"+path.Join(cmd.HostPort, cmd.PathPrefix, api.PathSearch), nil)
if err != nil {
return err
}

httpReq, err = api.BuildSearchRequest(httpReq, req)
if err != nil {
return err
}

httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
return err
}

fmt.Println(httpReq)
httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return err
}
defer httpResp.Body.Close()

body, err := io.ReadAll(httpResp.Body)
if err != nil {
return err
}

if httpResp.StatusCode != http.StatusOK {
return errors.New("failed to query: " + string(body))
}

resp := &tempopb.SearchResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(body), resp)
if err != nil {
panic("failed to parse resp: " + err.Error())
}
err = printAsJSON(resp)
if err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

traceByIDHandler := middleware.Wrap(queryFrontend.TraceByIDHandler)
searchHandler := middleware.Wrap(queryFrontend.SearchHandler)
searchWSHandler := middleware.Wrap(queryFrontend.SearchWSHandler)
spanMetricsSummaryHandler := middleware.Wrap(queryFrontend.SpanMetricsSummaryHandler)
searchTagsHandler := middleware.Wrap(queryFrontend.SearchTagsHandler)

Expand All @@ -383,6 +384,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http search endpoints
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), searchHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathWSSearch), searchWSHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), searchTagsHandler)
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), searchTagsHandler)
Expand Down
88 changes: 23 additions & 65 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For externally support GRPC API [see below](#tempo-grpc-api)
| [Ingest traces](#ingest) | Distributor | - | See section for details |
| [Querying traces by id](#query) | Query-frontend | HTTP | `GET /api/traces/<traceID>` |
| [Searching traces](#search) | Query-frontend | HTTP | `GET /api/search?<params>` |
| [Searching traces with streaming results](#search-with-websockets) | Query-frontend | HTTP | `GET /api/search-ws?<params>` |
| [Search tag names](#search-tags) | Query-frontend | HTTP | `GET /api/search/tags` |
| [Search tag names V2](#search-tags-v2) | Query-frontend | HTTP | `GET /api/v2/search/tags` |
| [Search tag values](#search-tag-values) | Query-frontend | HTTP | `GET /api/search/tag/<tag>/values` |
Expand Down Expand Up @@ -250,6 +251,24 @@ $ curl -G -s http://localhost:3200/api/search --data-urlencode 'tags=service.nam
}
```
### Search with websockets
Tempo supports streaming results returned over a websocket at the `/api/search-ws` endpoint. This endpoint supports all of the same
parameters as [search](#search). For a given query, Tempo will return a series of intermediate results, a final result and will then
gracefully close the websocket connection.
You can test this endpoint using curl:
```bash
curl -G -s http://localhost:3200/api/search-ws \
-H 'Connection: Upgrade' \
-H 'Upgrade: websocket' \
-H 'Sec-Websocket-Version: 13' \
-H 'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==' \
--data-urlencode 'q={ status=error }' \
--data-urlencode "start=$(date --date yesterday -u +%s)" \
--data-urlencode "end=$(date -u +%s)"
```
### Search tags
Ingester configuration `complete_block_timeout` affects how long tags are available for search.
Expand Down Expand Up @@ -573,71 +592,10 @@ Exposes the build information in a JSON object. The fields are `version`, `revis
## Tempo GRPC API
**Deprecated**
Tempo uses GRPC to internally communicate with itself, but only has one externally supported client. The query-frontend component implements
the streaming querier interface defined below. [See here](https://github.com/grafana/tempo/blob/main/pkg/tempopb/) for the complete proto definition and generated code.
By default this service is only offered over the GRPC port. However, one can offer this streaming service over the HTTP port as well (which Grafana expects).
To enable the streaming service over the http port for use with Grafana set the following.
> **Note**: Enabling this setting is incompatible with TLS.
```
stream_over_http_enabled: true
```
The below `rpc` call returns only traces that are new or have updated each time `SearchResponse` is returned except for the last response. The
final response sent is guaranteed to have the entire resultset.
```protobuf
service StreamingQuerier {
rpc Search(SearchRequest) returns (stream SearchResponse);
}
message SearchRequest {
map<string, string> Tags = 1
uint32 MinDurationMs = 2;
uint32 MaxDurationMs = 3;
uint32 Limit = 4;
uint32 start = 5;
uint32 end = 6;
string Query = 8;
}
message SearchResponse {
repeated TraceSearchMetadata traces = 1;
SearchMetrics metrics = 2;
}
message TraceSearchMetadata {
string traceID = 1;
string rootServiceName = 2;
string rootTraceName = 3;
uint64 startTimeUnixNano = 4;
uint32 durationMs = 5;
SpanSet spanSet = 6; // deprecated. use SpanSets field below
repeated SpanSet spanSets = 7;
}
message SpanSet {
repeated Span spans = 1;
uint32 matched = 2;
}
message Span {
string spanID = 1;
string name = 2;
uint64 startTimeUnixNano = 3;
uint64 durationNanos = 4;
repeated tempopb.common.v1.KeyValue attributes = 5;
}
message SearchMetrics {
uint32 inspectedTraces = 1;
uint64 inspectedBytes = 2;
uint32 totalBlocks = 3;
uint32 completedJobs = 4;
uint32 totalJobs = 5;
uint64 totalBlockBytes = 6;
}
```
The GRPC endpoint has been deprecated and will be removed in a future version. If you would like to streaming results for your TraceQL searches please use the
[websockets endpoint](#search-with-websockets).
28 changes: 25 additions & 3 deletions docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ The backend can be configured in a few ways:
Each option applies only to the command in which it is used. For example, `--backend <value>` does not permanently change where Tempo stores data. It only changes it for command in which you apply the option.

## Query API command
Call the tempo API and retrieve a trace by ID.

### Trace ID
Call the Tempo API and retrieve a trace by ID.
```bash
tempo-cli query api <api-endpoint> <trace-id>
tempo-cli query api trace-id <api-endpoint> <trace-id>
```

Arguments:
Expand All @@ -71,8 +73,28 @@ Options:

**Example:**
```bash
tempo-cli query api http://tempo:3200 f1cfe82a8eef933b
tempo-cli query api trace-id http://tempo:3200 f1cfe82a8eef933b
```

### Search
Call the Tempo API and search using TraceQL.

```bash
tempo-cli query api search <host-port> <trace-ql> <start> <end>
```
Arguments:
- `host-port` A host/port combination for Tempo. The scheme will be inferred based on the options provided.
- `trace-ql` TraceQL query.
- `start` Start of the time range to search: (YYYY-MM-DDThh:mm:ss)
- `end` End of the time range to search: (YYYY-MM-DDThh:mm:ss)

Options:
- `--org-id <value>` Organization ID (for use in multi-tenant setup).
- `--use-grpc` Use deprecated GRPC streaming
- `--use-ws` Use HTTP/Websocket streaming
- `--spss <value>` Number of spans to return for each spanset
- `--limit <value>` Number of results to return
- `--path-prefix <value>` String to prefix search paths with

## Query blocks command
Iterate over all backend blocks and dump all data found for a given trace id.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/googleapis/gax-go/v2 v2.10.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.74.0
Expand Down
4 changes: 4 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func TestAllInOne(t *testing.T) {
require.NoError(t, err)

util.SearchStreamAndAssertTrace(t, grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())

// test websockets
wsClient := httpclient.New("ws://"+tempo.Endpoint(3200), "")
util.SearchWSStreamAndAssertTrace(t, wsClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
})
}
}
Expand Down
17 changes: 17 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,23 @@ func SearchStreamAndAssertTrace(t *testing.T, client tempopb.StreamingQuerierCli
require.True(t, found)
}

func SearchWSStreamAndAssertTrace(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

attr := tempoUtil.RandomAttrFromTrace(expected)
query := fmt.Sprintf(`{ .%s = "%s"}`, attr.GetKey(), attr.GetValue().GetStringValue())

resp, err := client.SearchWithWebsocket(&tempopb.SearchRequest{
Query: query,
Start: uint32(start),
End: uint32(end),
}, func(sr *tempopb.SearchResponse) {})

require.NoError(t, err)
require.True(t, traceIDInResults(t, info.HexID(), resp))
}

// by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers
// to look in the backend blocks
func SearchAndAssertTraceBackend(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo, start, end int64) {
Expand Down
Loading