Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Adaptive Sampling Support for gRPC Remote Storage #6308

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-e2e-grpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jobs:
run: |
case ${{ matrix.version }} in
v1)
SPAN_STORAGE_TYPE=memory make grpc-storage-integration-test
SAMPLING_STORAGE_TYPE=memory SPAN_STORAGE_TYPE=memory make grpc-storage-integration-test
;;
v2)
STORAGE=grpc SPAN_STORAGE_TYPE=memory make jaeger-v2-storage-integration-test
SAMPLING_STORAGE_TYPE=memory STORAGE=grpc SPAN_STORAGE_TYPE=memory make jaeger-v2-storage-integration-test
;;
esac

Expand Down
17 changes: 14 additions & 3 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
DEFAULT_MAX_BUCKET_SIZE = 1
)

// Server runs a gRPC server
type Server struct {
opts *Options
Expand All @@ -36,8 +41,8 @@ type Server struct {
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings, samplingStoreFactory storage.SamplingStoreFactory) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, samplingStoreFactory, telset.Logger)
if err != nil {
return nil, err
}
Expand All @@ -54,7 +59,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
}, nil
}

func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
func createGRPCHandler(f storage.BaseFactory, samplingStoreFactory storage.SamplingStoreFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
Expand All @@ -67,12 +72,18 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH
if err != nil {
return nil, err
}
// TODO: Update this to use bucket size from config
samplingStore, err := samplingStoreFactory.CreateSamplingStore(DEFAULT_MAX_BUCKET_SIZE)
if err != nil {
return nil, err
}

impl := &shared.GRPCHandlerStorageImpl{
SpanReader: func() spanstore.Reader { return reader },
SpanWriter: func() spanstore.Writer { return writer },
DependencyReader: func() dependencystore.Reader { return depReader },
StreamingSpanWriter: func() spanstore.Writer { return nil },
SamplingStore: func() samplingstore.Store { return samplingStore },
}

// borrow code from Query service for archive storage
Expand Down
15 changes: 14 additions & 1 deletion cmd/remote-storage/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func TestNewServer_CreateStorageErrors(t *testing.T) {
factory.On("CreateSpanWriter").Return(nil, nil)
factory.On("CreateDependencyReader").Return(nil, errors.New("no deps")).Once()
factory.On("CreateDependencyReader").Return(nil, nil)
samplingStoreFactoryMocks := factoryMocks.NewSamplingStoreFactory(t)
f := func() (*Server, error) {
return NewServer(
&Options{GRPCHostPort: ":0"},
factory,
tenancy.NewManager(&tenancy.Options{}),
telemetry.NoopSettings(),
samplingStoreFactoryMocks,
)
}
_, err := f()
Expand Down Expand Up @@ -113,18 +115,21 @@ func TestNewServer_TLSConfigError(t *testing.T) {
ReportStatus: telemetry.HCAdapter(healthcheck.New()),
}
storageMocks := newStorageMocks()
samplingStoreFactoryMocks := factoryMocks.NewSamplingStoreFactory(t)

_, err := NewServer(
&Options{GRPCHostPort: ":8081", TLSGRPC: tlsCfg},
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
telset,
samplingStoreFactoryMocks,
)
assert.ErrorContains(t, err, "invalid TLS config")
}

func TestCreateGRPCHandler(t *testing.T) {
storageMocks := newStorageMocks()
h, err := createGRPCHandler(storageMocks.factory, zap.NewNop())
h, err := createGRPCHandler(storageMocks.factory, nil, zap.NewNop())
require.NoError(t, err)

storageMocks.writer.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("writer error"))
Expand Down Expand Up @@ -320,11 +325,15 @@ func TestServerGRPCTLS(t *testing.T) {
Logger: flagsSvc.Logger,
ReportStatus: telemetry.HCAdapter(flagsSvc.HC()),
}

samplingStoreFactoryMocks := factoryMocks.NewSamplingStoreFactory(t)

server, err := NewServer(
serverOptions,
storageMocks.factory,
tm,
telset,
samplingStoreFactoryMocks,
)
require.NoError(t, err)
require.NoError(t, server.Start())
Expand Down Expand Up @@ -369,11 +378,15 @@ func TestServerHandlesPortZero(t *testing.T) {
Logger: flagsSvc.Logger,
ReportStatus: telemetry.HCAdapter(flagsSvc.HC()),
}

samplingStoreFactoryMocks := factoryMocks.NewSamplingStoreFactory(t)

server, err := NewServer(
&Options{GRPCHostPort: ":0"},
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
telset,
samplingStoreFactoryMocks,
)
require.NoError(t, err)

Expand Down
7 changes: 6 additions & 1 deletion cmd/remote-storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ func main() {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

samplingStoreFactory, err := storageFactory.CreateSamplingStoreFactory()
if err != nil {
logger.Fatal("Failed to init sampling storage factory", zap.Error(err))
}

tm := tenancy.NewManager(&opts.Tenancy)
telset := baseTelset // copy
telset.Metrics = metricsFactory
server, err := app.NewServer(opts, storageFactory, tm, telset)
server, err := app.NewServer(opts, storageFactory, tm, telset, samplingStoreFactory)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)
Expand Down Expand Up @@ -161,6 +162,7 @@ func (f *Factory) newRemoteStorage(
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
SamplingStore: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down Expand Up @@ -230,6 +232,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return f.services.ArchiveStore.ArchiveSpanWriter(), nil
}

func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return f.services.SamplingStore.SamplingStore(), nil
}

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,63 @@ message FindTraceIDsResponse {
];
}

message Throughput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema should be documented. Wherever you are copying this from, we have comments explaining all types.

string Service = 1;
string Operation = 2;
int64 Count = 3;
repeated double Probabilities = 4;
}

message InsertThroughputRequest {
repeated Throughput throughput = 1;
}

message InsertThroughputResponse {
}

message StringFloatMap {
map<string, double> stringFloatMap = 1;
}

message ServiceOperationProbabilities {
map<string, StringFloatMap> serviceOperationProbabilities = 1;
}

message ServiceOperationQPS {
map<string, StringFloatMap> serviceOperationQPS = 1;
}

message InsertProbabilitiesAndQPSRequest {
string hostname = 1;
ServiceOperationProbabilities probabilities = 2;
ServiceOperationQPS qps = 3;
}

message InsertProbabilitiesAndQPSResponse {
}

message GetThroughputRequest {
google.protobuf.Timestamp start_time = 1[
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end_time = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
}

message GetThroughputResponse {
repeated Throughput throughput = 1;
}

message GetLatestProbabilitiesRequest {
}

message GetLatestProbabilitiesResponse {
ServiceOperationProbabilities serviceOperationProbabilities = 1;
}

service SpanWriterPlugin {
// spanstore/Writer
rpc WriteSpan(WriteSpanRequest) returns (WriteSpanResponse);
Expand Down Expand Up @@ -182,6 +239,16 @@ service DependenciesReaderPlugin {
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
}

service SamplingStorePlugin{
rpc InsertThroughput(InsertThroughputRequest) returns (InsertThroughputResponse);

rpc InsertProbabilitiesAndQPS(InsertProbabilitiesAndQPSRequest) returns (InsertProbabilitiesAndQPSResponse);

rpc GetThroughput(GetThroughputRequest) returns (GetThroughputResponse);

rpc GetLatestProbabilities(GetLatestProbabilitiesRequest) returns (GetLatestProbabilitiesResponse);
}

// empty; extensible in the future
message CapabilitiesRequest {

Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

samplingStoreModel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -39,6 +41,7 @@ type GRPCClient struct {
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
samplingStoreClient storage_v1.SamplingStorePluginClient
}

func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *GRPCClient {
Expand All @@ -50,6 +53,7 @@ func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn),
samplingStoreClient: storage_v1.NewSamplingStorePluginClient(untracedConn),
}
}

Expand All @@ -68,6 +72,10 @@ func (c *GRPCClient) SpanWriter() spanstore.Writer {
return c
}

func (c *GRPCClient) SamplingStore() samplingstore.Store {
return c
}

func (c *GRPCClient) StreamingSpanWriter() spanstore.Writer {
return newStreamingSpanWriter(c.streamWriterClient)
}
Expand Down Expand Up @@ -266,3 +274,62 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace,

return &trace, nil
}

func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error {
ctx := context.Background()
storageV1Throughput, err := samplingStoreThroughputsToStorageV1Throughputs(throughputs)
if err != nil {
return err
}

_, err = c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{
Throughput: storageV1Throughput,
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
ctx := context.Background()

_, err := c.samplingStoreClient.InsertProbabilitiesAndQPS(ctx, &storage_v1.InsertProbabilitiesAndQPSRequest{
Hostname: hostname,
Probabilities: &storage_v1.ServiceOperationProbabilities{
ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities),
},
Qps: &storage_v1.ServiceOperationQPS{
ServiceOperationQPS: sSFloatMapToStorageV1SSFloatMap(qps),
},
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel.Throughput, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetThroughput(ctx, &storage_v1.GetThroughputRequest{
StartTime: start,
EndTime: end,
})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

return storageV1ThroughputsToSamplingStoreThroughputs(resp.Throughput), nil
}

func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetLatestProbabilities(ctx, &storage_v1.GetLatestProbabilitiesRequest{})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

return storageV1SSFloatMapToSSFloatMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil
}
Loading