Skip to content

Commit

Permalink
init and todos
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero committed Dec 14, 2023
1 parent 54644a9 commit 969dbb3
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/protobuf/jsonpb" //nolint:all //deprecated
"github.com/gorilla/websocket"
"github.com/grafana/tempo/modules/frontend/combiner"
"go.uber.org/atomic"

"github.com/grafana/dskit/user"
Expand Down Expand Up @@ -105,11 +106,19 @@ func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream
o: o,
searchCache: searchCache,
cfg: &cfg,
preMiddleware: newMultiTenantUnsupportedMiddleware(cfg, logger),
preMiddleware: newMultiTenantMiddleware(cfg, combiner.NewSearch, logger),
}

downstreamPath := path.Join(apiPrefix, api.PathSearch)
return func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error {
// tenant, ctx, err := user.ExtractFromGRPCRequest(srv.Context())
// if err != nil {
// level.Error(logger).Log("msg", "search streaming: extract org id failed", "err", err)
// return fmt.Errorf("extract org id failed: %w", err)
// }
//
// level.Debug(logger).Log("msg", "in search streaming", "tenant", tenant)

httpReq, err := api.BuildSearchRequest(&http.Request{
URL: &url.URL{
Path: downstreamPath,
Expand All @@ -123,6 +132,7 @@ func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream
return fmt.Errorf("build search request failed: %w", err)
}

// we inject context here?? why??
httpReq = httpReq.WithContext(srv.Context())

return searcher.handle(httpReq, func(resp *tempopb.SearchResponse) error {
Expand All @@ -140,7 +150,7 @@ func newSearchStreamingWSHandler(cfg Config, o overrides.Interface, downstream h
o: o,
searchCache: searchCache,
cfg: &cfg,
preMiddleware: newMultiTenantUnsupportedMiddleware(cfg, logger),
preMiddleware: newMultiTenantMiddleware(cfg, combiner.NewSearch, logger),
}

// since this is a backend DB we allow websockets to originate from anywhere
Expand Down Expand Up @@ -240,6 +250,7 @@ type streamingSearcher struct {
preMiddleware Middleware
}

// multi-tenant support in handle func, and that will make the streaming for multi-tenant search work??
func (s *streamingSearcher) handle(r *http.Request, forwardResults func(*tempopb.SearchResponse) error) error {
ctx := r.Context()

Expand All @@ -255,12 +266,26 @@ func (s *streamingSearcher) handle(r *http.Request, forwardResults func(*tempopb

progress := atomic.NewPointer[*diffSearchProgress](nil)
fn := func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress {
// FIXME: we need to use a single diff searcher across all tenants??
// or maybe a searcher that sums up things across multi-tenants??
p := newDiffSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes)
progress.Store(&p)
return p
}

// TODO: we passed multi-tenant middleware as downstream, and it is pre-fixed before our
// search sharding middleware?? and it works because we run this before we do search sharding??
// order of ops:
// - newMultiTenantMiddleware
// - newSearchSharder
// - s.downstream

// build roundtripper
ss := newSearchSharder(s.reader, s.o, s.cfg.Search.Sharder, fn, s.searchCache, s.logger)

// preMiddleware will make multiple calls to newSearchSharder in case of multi-tenant search
// and it will end up creating it's own newDiffSearchProgress for each tenant?
// we need to make fn add up the values instead of giving new values each time?
rt := NewRoundTripper(s.downstream, s.preMiddleware, ss)

type roundTripResult struct {
Expand Down

0 comments on commit 969dbb3

Please sign in to comment.