Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for GRPC storarge #5228

Merged
merged 8 commits into from
Feb 27, 2024
35 changes: 35 additions & 0 deletions cmd/jaeger/grpc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: external-storage
trace_storage_archive: external-storage-archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
external-storage-archive:
server: localhost:17281
connection-timeout: 5s

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: external-storage
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is how we want to do the configuration in V2, but we can address it more holistically later -- #5229

// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand All @@ -27,6 +29,7 @@ type MemoryStorage struct {

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
//nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type
if reflect.DeepEqual(*cfg, *emptyCfg) {
return fmt.Errorf("%s: no storage type present in config", ID)
} else {
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -107,10 +109,17 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
builder: grpc.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
9 changes: 7 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
remoteRPCClient *grpc.ClientConn
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand Down Expand Up @@ -78,6 +79,9 @@
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}
if c.remoteRPCClient != nil {
Copy link
Contributor Author

@james-ryans james-ryans Feb 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pass the previous code coverage, I set up a gRPC server within the unit test for the factory to use, since we can't just mock the gRPC configuration. Because the current gRPC storage doesn't close the remote gRPC client, I added a method to close the client right inside the factory's Close() function. However, it looks like the code coverage tool isn't recognizing this particular line as covered. Even though I've called the Close() function in my unit test, which should hit this line, the method is abstracted behind an interface. Could this be why it's not showing up as covered?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Go coverage is computed in a weird way, it's package local, i.e. coverage is only collected for a package from tests running in the same package. This behavior can be altered by passing additional packages explicitly, as we do for storage integration tests (we tell Go to count coverage in all other packages):

$(GOTEST) -coverpkg=./... -coverprofile cover.out $(STORAGE_PKGS)

So when the integration test runs for grpc-plugin it should include your Close changes, but not if you just run go test manually without -coverpkg

c.remoteRPCClient.Close()
}

Check warning on line 84 in plugin/storage/grpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/config/config.go#L82-L84

Added lines #L82 - L84 were not covered by tests

return c.RemoteTLS.Close()
}
Expand Down Expand Up @@ -106,12 +110,13 @@
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
var err error
c.remoteRPCClient, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a client

Suggested change
c.remoteRPCClient, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)

if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
grpcClient := shared.NewGRPCClient(c.remoteRPCClient)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down
15 changes: 15 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func NewFactory() *Factory {
return &Factory{}
}

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.InitFromOptions(Options{Configuration: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)
Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package grpc

import (
"errors"
"log"
"net"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -143,6 +147,36 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactoryWithConfig(t *testing.T) {
cfg := grpcConfig.Configuration{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage")

lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
require.NoError(t, err, "failed to listen")


s := grpc.NewServer()
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
defer s.Stop()

cfg.RemoteServerAddr = lis.Addr().String()
cfg.RemoteConnectTimeout = 1 * time.Second
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
defer func() {
err := f.Close()
if err != nil {
log.Fatalf("Client exited with error: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := f.Close()
if err != nil {
log.Fatalf("Client exited with error: %v", err)
}
require.NoError(t, f.Close())

}()
require.NoError(t, err)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
Expand Down
Loading