Skip to content

Commit

Permalink
grpcreflect: fallback from v1 to v1alpha on unavailable error; ports PR
Browse files Browse the repository at this point in the history
#588 from v1
  • Loading branch information
jhump committed Feb 24, 2024
1 parent cc5b31a commit f3863e8
Show file tree
Hide file tree
Showing 31 changed files with 276 additions and 73 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.32.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
Expand Down
10 changes: 9 additions & 1 deletion grpcreflect/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,15 @@ func (cr *Client) doSendLocked(attemptCount int, prevErr error, req *refv1alpha.
if attemptCount >= 3 && prevErr != nil {
return nil, prevErr
}
if status.Code(prevErr) == codes.Unimplemented && cr.useV1() {
if (status.Code(prevErr) == codes.Unimplemented ||
status.Code(prevErr) == codes.Unavailable) &&
cr.useV1() {
// If v1 is unimplemented, fallback to v1alpha.
// We also fallback on unavailable because some servers have been
// observed to close the connection/cancel the stream, w/out sending
// back status or headers, when the service name is not known. When
// this happens, the RPC status code is unavailable.
// See https://github.com/fullstorydev/grpcurl/issues/434
cr.useV1Alpha = true
cr.lastTriedV1 = cr.now()
}
Expand Down
147 changes: 144 additions & 3 deletions grpcreflect/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpcreflect
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -265,7 +266,8 @@ func TestRecover(t *testing.T) {

// kill the stream
stream := client.stream
_ = client.stream.CloseSend()
err = client.stream.CloseSend()
require.NoError(t, err)

// it should auto-recover and re-create stream
_, err = client.ListServices()
Expand Down Expand Up @@ -464,6 +466,8 @@ func TestAutoVersion(t *testing.T) {
"/grpc.reflection.v1.ServerReflection/ServerReflectionInfo",
})
})

t.Run("fallback-on-unavailable", testClientAutoOnUnavailable)
}

func testClientAuto(t *testing.T, register func(*grpc.Server), expectedServices []protoreflect.FullName, expectedLog []string) {
Expand All @@ -475,7 +479,8 @@ func testClientAuto(t *testing.T, register func(*grpc.Server), expectedServices
panic(fmt.Sprintf("Failed to open server socket: %s", err.Error()))
}
go func() {
_ = svr.Serve(l)
err := svr.Serve(l)
require.NoError(t, err)
}()
defer svr.Stop()

Expand All @@ -484,7 +489,8 @@ func testClientAuto(t *testing.T, register func(*grpc.Server), expectedServices
panic(fmt.Sprintf("Failed to create grpc client: %s", err.Error()))
}
defer func() {
_ = cconn.Close()
err := cconn.Close()
require.NoError(t, err)
}()
client := NewClientAuto(context.Background(), cconn)
now := time.Now()
Expand Down Expand Up @@ -543,3 +549,138 @@ func (c *captureStreamNames) intercept(srv interface{}, ss grpc.ServerStream, in
func (c *captureStreamNames) handleUnknown(_ interface{}, _ grpc.ServerStream) error {
return status.Errorf(codes.Unimplemented, "WTF?")
}

func testClientAutoOnUnavailable(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(fmt.Sprintf("Failed to open server socket: %s", err.Error()))
}
captureConn := &captureListener{Listener: l}

var capture captureStreamNames
svr := grpc.NewServer(
grpc.StreamInterceptor(capture.intercept),
grpc.UnknownServiceHandler(func(_ interface{}, _ grpc.ServerStream) error {
// On unknown method, forcibly close the net.Conn, without sending
// back any reply, which should result in an "unavailable" error.
return captureConn.latest().Close()
}),
)
impl := reflection.NewServer(reflection.ServerOptions{Services: svr})
refv1alpha.RegisterServerReflectionServer(svr, impl)
testprotosgrpc.RegisterDummyServiceServer(svr, testService{})

go func() {
err := svr.Serve(captureConn)
require.NoError(t, err)
}()
defer svr.Stop()

var captureErrs captureErrors
cconn, err := grpc.Dial(
l.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStreamInterceptor(captureErrs.intercept),
)
if err != nil {
panic(fmt.Sprintf("Failed to create grpc client: %s", err.Error()))
}
defer func() {
err := cconn.Close()
require.NoError(t, err)
}()
client := NewClientAuto(context.Background(), cconn)
now := time.Now()
client.now = func() time.Time {
return now
}

svcs, err := client.ListServices()
require.NoError(t, err)
sort.Slice(svcs, func(i, j int) bool {
return svcs[i] < svcs[j]
})
require.Equal(t, []protoreflect.FullName{
"grpc.reflection.v1alpha.ServerReflection",
"testprotos.DummyService",
}, svcs)

// It should have tried v1 first and failed then tried v1alpha.
actualLog := capture.names()
require.Equal(t, []string{
"/grpc.reflection.v1.ServerReflection/ServerReflectionInfo",
"/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
}, actualLog)

// Make sure the error code observed by the client was unavailable and not unimplemented.
actualCodes := captureErrs.codes()
require.Equal(t, []codes.Code{codes.Unavailable}, actualCodes)
}

type captureListener struct {
net.Listener
mu sync.Mutex
conn net.Conn
}

func (c *captureListener) Accept() (net.Conn, error) {
conn, err := c.Listener.Accept()
if err == nil {
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
}
return conn, err
}

func (c *captureListener) latest() net.Conn {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn
}

type captureErrors struct {
mu sync.Mutex
observed []codes.Code
}

func (c *captureErrors) intercept(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
stream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
c.observe(err)
return nil, err
}
return &captureErrorStream{ClientStream: stream, c: c}, nil
}

func (c *captureErrors) observe(err error) {
c.mu.Lock()
c.observed = append(c.observed, status.Code(err))
c.mu.Unlock()
}

func (c *captureErrors) codes() []codes.Code {
c.mu.Lock()
defer c.mu.Unlock()
ret := make([]codes.Code, len(c.observed))
copy(ret, c.observed)
return ret
}

type captureErrorStream struct {
grpc.ClientStream
c *captureErrors
done int32
}

func (c *captureErrorStream) RecvMsg(m interface{}) error {
err := c.ClientStream.RecvMsg(m)
if err == nil || errors.Is(err, io.EOF) {
return nil
}
// Only record one error per RPC.
if atomic.CompareAndSwapInt32(&c.done, 0, 1) {
c.c.observe(err)
}
return err
}
21 changes: 15 additions & 6 deletions grpcreflect/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,32 @@ type GRPCServer = reflection.GRPCServer
// given GRPC server.
func LoadServiceDescriptors(s GRPCServer) (map[string]protoreflect.ServiceDescriptor, error) {
descs := map[string]protoreflect.ServiceDescriptor{}
for name := range s.GetServiceInfo() {
sd, err := protoresolve.GlobalDescriptors.FindServiceByName(protoreflect.FullName(name))
if err != nil {
return nil, fmt.Errorf("could not resolve descriptor for service %q: %w", name, err)
for name, info := range s.GetServiceInfo() {
// See if the service info provides the schema in the service metadata.
sd, ok := info.Metadata.(protoreflect.ServiceDescriptor)
if !ok {
var err error
sd, err = protoresolve.GlobalDescriptors.FindServiceByName(protoreflect.FullName(name))
if err != nil {
return nil, fmt.Errorf("could not resolve descriptor for service %q: %w", name, err)
}
}
descs[name] = sd
}
return descs, nil
}

// LoadServiceDescriptor loads a rich descriptor for a given service description
// generated by protoc-gen-go. Generated code contains an unexported symbol with
// a name like "_<Service>_serviceDesc" which is the service's description. It
// generated by protoc-gen-go. Generated code contains an exported symbol with
// a name like "<Service>_serviceDesc" which is the service's description. It
// is used internally to register a service implementation with a GRPC server.
// But it can also be used by this package to retrieve the rich descriptor for
// the service.
func LoadServiceDescriptor(svc *grpc.ServiceDesc) (protoreflect.ServiceDescriptor, error) {
// See if the service info provides the schema in the service metadata.
if sd, ok := svc.Metadata.(protoreflect.ServiceDescriptor); ok {
return sd, nil
}
sd, err := protoresolve.GlobalDescriptors.FindServiceByName(protoreflect.FullName(svc.ServiceName))
if err != nil {
return nil, fmt.Errorf("could not resolve descriptor for service %q: %w", svc.ServiceName, err)
Expand Down
4 changes: 2 additions & 2 deletions internal/testdata/desc_test1.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_comments.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified internal/testdata/desc_test_comments.protoset
Binary file not shown.
4 changes: 2 additions & 2 deletions internal/testdata/desc_test_complex.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified internal/testdata/desc_test_complex_source_info.protoset
Binary file not shown.
4 changes: 2 additions & 2 deletions internal/testdata/desc_test_defaults.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_field_types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_oneof.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_options.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_proto3.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_value.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/testdata/desc_test_wellknowntypes.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified internal/testdata/descriptor.protoset
Binary file not shown.
4 changes: 2 additions & 2 deletions internal/testdata/grpc/dummy.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f3863e8

Please sign in to comment.