-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[jaeger-v2] Add support for GRPC storarge #5228
Changes from 2 commits
ee39180
2798a45
709bb09
f50daec
c65ddfc
05d2c05
992b223
cd405ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
service: | ||
extensions: [jaeger_storage, jaeger_query] | ||
pipelines: | ||
traces: | ||
receivers: [otlp] | ||
processors: [batch] | ||
exporters: [jaeger_storage_exporter] | ||
|
||
extensions: | ||
jaeger_query: | ||
trace_storage: external-storage | ||
trace_storage_archive: external-storage-archive | ||
ui_config: ./cmd/jaeger/config-ui.json | ||
|
||
jaeger_storage: | ||
grpc: | ||
external-storage: | ||
server: localhost:17271 | ||
connection-timeout: 5s | ||
external-storage-archive: | ||
server: localhost:17281 | ||
connection-timeout: 5s | ||
|
||
receivers: | ||
otlp: | ||
protocols: | ||
grpc: | ||
http: | ||
|
||
processors: | ||
batch: | ||
|
||
exporters: | ||
jaeger_storage_exporter: | ||
trace_storage: external-storage |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -50,6 +50,7 @@ | |||||
pluginHealthCheck *time.Ticker | ||||||
pluginHealthCheckDone chan bool | ||||||
pluginRPCClient plugin.ClientProtocol | ||||||
remoteRPCClient *grpc.ClientConn | ||||||
} | ||||||
|
||||||
// ClientPluginServices defines services plugin can expose and its capabilities | ||||||
|
@@ -78,6 +79,9 @@ | |||||
c.pluginHealthCheck.Stop() | ||||||
c.pluginHealthCheckDone <- true | ||||||
} | ||||||
if c.remoteRPCClient != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To pass the previous code coverage, I set up a gRPC server within the unit test for the factory to use, since we can't just mock the gRPC configuration. Because the current gRPC storage doesn't close the remote gRPC client, I added a method to close the client right inside the factory's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in Go coverage is computed in a weird way, it's package local, i.e. coverage is only collected for a package from tests running in the same package. This behavior can be altered by passing additional packages explicitly, as we do for storage integration tests (we tell Go to count coverage in all other packages):
So when the integration test runs for grpc-plugin it should include your Close changes, but not if you just run |
||||||
c.remoteRPCClient.Close() | ||||||
} | ||||||
|
||||||
return c.RemoteTLS.Close() | ||||||
} | ||||||
|
@@ -106,12 +110,13 @@ | |||||
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) | ||||||
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) | ||||||
} | ||||||
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) | ||||||
var err error | ||||||
c.remoteRPCClient, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not a client
Suggested change
|
||||||
if err != nil { | ||||||
return nil, fmt.Errorf("error connecting to remote storage: %w", err) | ||||||
} | ||||||
|
||||||
grpcClient := shared.NewGRPCClient(conn) | ||||||
grpcClient := shared.NewGRPCClient(c.remoteRPCClient) | ||||||
return &ClientPluginServices{ | ||||||
PluginServices: shared.PluginServices{ | ||||||
Store: grpcClient, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,13 +16,17 @@ package grpc | |||||||||||
|
||||||||||||
import ( | ||||||||||||
"errors" | ||||||||||||
"log" | ||||||||||||
"net" | ||||||||||||
"testing" | ||||||||||||
"time" | ||||||||||||
|
||||||||||||
"github.com/spf13/viper" | ||||||||||||
"github.com/stretchr/testify/assert" | ||||||||||||
"github.com/stretchr/testify/require" | ||||||||||||
"go.opentelemetry.io/otel/trace" | ||||||||||||
"go.uber.org/zap" | ||||||||||||
"google.golang.org/grpc" | ||||||||||||
|
||||||||||||
"github.com/jaegertracing/jaeger/pkg/config" | ||||||||||||
"github.com/jaegertracing/jaeger/pkg/metrics" | ||||||||||||
|
@@ -143,6 +147,36 @@ func TestGRPCStorageFactory(t *testing.T) { | |||||||||||
assert.Equal(t, f.store.DependencyReader(), depReader) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func TestGRPCStorageFactoryWithConfig(t *testing.T) { | ||||||||||||
cfg := grpcConfig.Configuration{} | ||||||||||||
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) | ||||||||||||
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") | ||||||||||||
|
||||||||||||
lis, err := net.Listen("tcp", ":0") | ||||||||||||
if err != nil { | ||||||||||||
t.Fatalf("failed to listen: %v", err) | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
|
||||||||||||
s := grpc.NewServer() | ||||||||||||
go func() { | ||||||||||||
if err := s.Serve(lis); err != nil { | ||||||||||||
log.Fatalf("Server exited with error: %v", err) | ||||||||||||
} | ||||||||||||
}() | ||||||||||||
defer s.Stop() | ||||||||||||
|
||||||||||||
cfg.RemoteServerAddr = lis.Addr().String() | ||||||||||||
cfg.RemoteConnectTimeout = 1 * time.Second | ||||||||||||
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) | ||||||||||||
defer func() { | ||||||||||||
err := f.Close() | ||||||||||||
if err != nil { | ||||||||||||
log.Fatalf("Client exited with error: %v", err) | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
}() | ||||||||||||
require.NoError(t, err) | ||||||||||||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
} | ||||||||||||
|
||||||||||||
func TestGRPCStorageFactory_Capabilities(t *testing.T) { | ||||||||||||
f := NewFactory() | ||||||||||||
v := viper.New() | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is how we want to do the configuration in V2, but we can address it more holistically later -- #5229