diff --git a/config/core/300-resources/configuration.yaml b/config/core/300-resources/configuration.yaml index 80db933c47ac..acd9c2cc6ddb 100644 --- a/config/core/300-resources/configuration.yaml +++ b/config/core/300-resources/configuration.yaml @@ -247,6 +247,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object @@ -355,6 +368,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object diff --git a/config/core/300-resources/revision.yaml b/config/core/300-resources/revision.yaml index 00c98bd84cfb..9cc3fe276bda 100644 --- a/config/core/300-resources/revision.yaml +++ b/config/core/300-resources/revision.yaml @@ -226,6 +226,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object @@ -334,6 +347,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object diff --git a/config/core/300-resources/service.yaml b/config/core/300-resources/service.yaml index dc489fba43ea..922461fafd91 100644 --- a/config/core/300-resources/service.yaml +++ b/config/core/300-resources/service.yaml @@ -251,6 +251,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object @@ -359,6 +372,19 @@ spec: description: Minimum consecutive failures for the probe to be considered failed after having succeeded. Defaults to 3. Minimum value is 1. type: integer format: int32 + grpc: + description: GRPC specifies an action involving a GRPC port. This is a beta field and requires enabling GRPCContainerProbe feature gate. + type: object + required: + - port + properties: + port: + description: Port number of the gRPC service. Number must be in the range 1 to 65535. + type: integer + format: int32 + service: + description: "Service is the name of the service to place in the gRPC HealthCheckRequest (see https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \n If this is not specified, the default behavior is defined by gRPC." + type: string httpGet: description: HTTPGet specifies the http request to perform. type: object diff --git a/go.mod b/go.mod index 79e87c18b7fa..251b3588fc49 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/net v0.15.0 golang.org/x/sync v0.3.0 + golang.org/x/sys v0.12.0 golang.org/x/time v0.3.0 google.golang.org/api v0.141.0 google.golang.org/grpc v1.58.1 @@ -138,7 +139,6 @@ require ( golang.org/x/crypto v0.13.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect - golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.13.0 // indirect diff --git a/hack/schemapatch-config.yaml b/hack/schemapatch-config.yaml index febc4a034ad0..d7b9babb9fdb 100644 --- a/hack/schemapatch-config.yaml +++ b/hack/schemapatch-config.yaml @@ -198,6 +198,11 @@ k8s.io/api/core/v1.ProbeHandler: - Exec - HTTPGet - TCPSocket + - GRPC +k8s.io/api/core/v1.GRPCAction: + fieldMask: + - Port + - Service k8s.io/api/core/v1.ExecAction: fieldMask: - Command diff --git a/pkg/apis/serving/fieldmask.go b/pkg/apis/serving/fieldmask.go index 1d4dd77ebfe0..7c02f174dae3 100644 --- a/pkg/apis/serving/fieldmask.go +++ b/pkg/apis/serving/fieldmask.go @@ -374,6 +374,7 @@ func HandlerMask(in *corev1.ProbeHandler) *corev1.ProbeHandler { out.Exec = in.Exec out.HTTPGet = in.HTTPGet out.TCPSocket = in.TCPSocket + out.GRPC = in.GRPC return out @@ -429,6 +430,22 @@ func TCPSocketActionMask(in *corev1.TCPSocketAction) *corev1.TCPSocketAction { return out } +// GRPCActionMask performs a _shallow_ copy of the Kubernetes GRPCAction object to a new +// Kubernetes GRPCAction object bringing over only the fields allowed in the Knative API. This +// does not validate the contents or the bounds of the provided fields. +func GRPCActionMask(in *corev1.GRPCAction) *corev1.GRPCAction { + if in == nil { + return nil + } + out := new(corev1.GRPCAction) + + // Allowed fields + out.Port = in.Port + out.Service = in.Service + + return out +} + // ContainerPortMask performs a _shallow_ copy of the Kubernetes ContainerPort object to a new // Kubernetes ContainerPort object bringing over only the fields allowed in the Knative API. This // does not validate the contents or the bounds of the provided fields. diff --git a/pkg/apis/serving/fieldmask_test.go b/pkg/apis/serving/fieldmask_test.go index 5e957bb92521..9d22654ce474 100644 --- a/pkg/apis/serving/fieldmask_test.go +++ b/pkg/apis/serving/fieldmask_test.go @@ -318,6 +318,7 @@ func TestHandlerMask(t *testing.T) { Exec: &corev1.ExecAction{}, HTTPGet: &corev1.HTTPGetAction{}, TCPSocket: &corev1.TCPSocketAction{}, + GRPC: &corev1.GRPCAction{}, } in := want @@ -421,6 +422,33 @@ func TestTCPSocketActionMask(t *testing.T) { } } +func TestGRPCActionMask(t *testing.T) { + want := &corev1.GRPCAction{ + Port: 42, + Service: ptr.String("foo"), + } + in := &corev1.GRPCAction{ + Port: 42, + Service: ptr.String("foo"), + } + + got := GRPCActionMask(in) + + if &want == &got { + t.Error("Input and output share addresses. Want different addresses") + } + + if diff, err := kmp.SafeDiff(want, got); err != nil { + t.Error("Got error comparing output, err =", err) + } else if diff != "" { + t.Error("GRPCActionMask (-want, +got):", diff) + } + + if got = GRPCActionMask(nil); got != nil { + t.Errorf("GRPCActionMask(nil) = %v, want: nil", got) + } +} + func TestContainerPortMask(t *testing.T) { want := &corev1.ContainerPort{ ContainerPort: 42, diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index efa65a05e914..797dfb38a508 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -832,9 +832,13 @@ func validateProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError handlers = append(handlers, "exec") errs = errs.Also(apis.CheckDisallowedFields(*h.Exec, *ExecActionMask(h.Exec))).ViaField("exec") } + if h.GRPC != nil { + handlers = append(handlers, "gRPC") + errs = errs.Also(apis.CheckDisallowedFields(*h.GRPC, *GRPCActionMask(h.GRPC))).ViaField("grpc") + } if len(handlers) == 0 { - errs = errs.Also(apis.ErrMissingOneOf("httpGet", "tcpSocket", "exec")) + errs = errs.Also(apis.ErrMissingOneOf("httpGet", "tcpSocket", "exec", "grpc")) } else if len(handlers) > 1 { errs = errs.Also(apis.ErrMultipleOneOf(handlers...)) } diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index 9f73a98c43ca..16bf426a640a 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -1654,7 +1654,7 @@ func TestContainerValidation(t *testing.T) { ProbeHandler: corev1.ProbeHandler{}, }, }, - want: apis.ErrMissingOneOf("livenessProbe.httpGet", "livenessProbe.tcpSocket", "livenessProbe.exec"), + want: apis.ErrMissingOneOf("livenessProbe.httpGet", "livenessProbe.tcpSocket", "livenessProbe.exec", "livenessProbe.grpc"), }, { name: "invalid with multiple handlers", c: corev1.Container{ @@ -1888,6 +1888,39 @@ func TestContainerValidation(t *testing.T) { }, want: apis.ErrDisallowedFields("lifecycle").Also( apis.ErrMissingField("image")), + }, { + name: "valid grpc probe", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: 46, + }, + }, + }, + }, + }, { + name: "valid grpc probe with service", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: 46, + Service: ptr.String("foo"), + }, + }, + }, + }, }, } tests = append(tests, getCommonContainerValidationTestCases()...) diff --git a/pkg/apis/serving/v1/revision_defaults.go b/pkg/apis/serving/v1/revision_defaults.go index 4805f5b1fe35..2b3f5f2f29da 100644 --- a/pkg/apis/serving/v1/revision_defaults.go +++ b/pkg/apis/serving/v1/revision_defaults.go @@ -143,10 +143,15 @@ func (*RevisionSpec) applyProbes(container *corev1.Container) { } if container.ReadinessProbe.TCPSocket == nil && container.ReadinessProbe.HTTPGet == nil && - container.ReadinessProbe.Exec == nil { + container.ReadinessProbe.Exec == nil && + container.ReadinessProbe.GRPC == nil { container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{} } + if container.ReadinessProbe.GRPC != nil && container.ReadinessProbe.GRPC.Service == nil { + container.ReadinessProbe.GRPC.Service = ptr.String("") + } + if container.ReadinessProbe.SuccessThreshold == 0 { container.ReadinessProbe.SuccessThreshold = 1 } diff --git a/pkg/queue/health/probe.go b/pkg/queue/health/probe.go index 0151bbe05316..2aa0d6e27e4d 100644 --- a/pkg/queue/health/probe.go +++ b/pkg/queue/health/probe.go @@ -17,16 +17,27 @@ limitations under the License. package health import ( + "context" + "errors" "fmt" "io" "net" "net/http" "net/url" + "syscall" "time" + "golang.org/x/sys/unix" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + grpchealth "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" netheader "knative.dev/networking/pkg/http/header" pkgnet "knative.dev/pkg/network" + "knative.dev/pkg/ptr" ) // HTTPProbeConfigOptions holds the HTTP probe config options @@ -44,6 +55,14 @@ type TCPProbeConfigOptions struct { Address string } +// GRPCProbeConfigOptions holds the gRPC probe config options +type GRPCProbeConfigOptions struct { + Timeout time.Duration + *corev1.GRPCAction + KubeMajor string + KubeMinor string +} + // TCPProbe checks that a TCP socket to the address can be opened. // Did not reuse k8s.io/kubernetes/pkg/probe/tcp to not create a dependency // on klog. @@ -204,3 +223,68 @@ func isHTTPProbeReady(res *http.Response) bool { // response status code between 200-399 indicates success return res.StatusCode >= 200 && res.StatusCode < 400 } + +// GRPCProbe checks that gRPC connection can be established to the address. +func GRPCProbe(config GRPCProbeConfigOptions) error { + + // Use k8s.io/kubernetes/pkg/probe/dialer_others.go to correspond to OSs other than Windows + dialer := &net.Dialer{ + Control: func(network, address string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + unix.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &unix.Linger{Onoff: 1, Linger: 1}) + }) + }, + } + + opts := []grpc.DialOption{ + grpc.WithUserAgent(netheader.KubeProbeUAPrefix + config.KubeMajor + "/" + config.KubeMinor), + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), // credentials are currently not supported + grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "tcp", addr) + }), + } + + ctx, cancel := context.WithTimeout(context.Background(), config.Timeout) + + defer cancel() + + addr := net.JoinHostPort("127.0.0.1", fmt.Sprintf("%d", config.Port)) + conn, err := grpc.DialContext(ctx, addr, opts...) + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("failed to connect service %q within %v: %w", addr, config.Timeout, err) + } + return fmt.Errorf("failed to connect service at %q: %w", addr, err) + } + + defer func() { + _ = conn.Close() + }() + + client := grpchealth.NewHealthClient(conn) + + resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{ + Service: ptr.StringValue(config.Service), + }) + + if err != nil { + stat, ok := status.FromError(err) + if ok { + switch stat.Code() { + case codes.Unimplemented: + return fmt.Errorf("this server does not implement the grpc health protocol (grpc.health.v1.Health) %w", err) + case codes.DeadlineExceeded: + return fmt.Errorf("health rpc did not complete within %v: %w", config.Timeout, err) + } + } + return fmt.Errorf("health rpc probe failed: %w", err) + } + + if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING { + return fmt.Errorf("service unhealthy (responded with %q)", resp.GetStatus().String()) + } + + return nil +} diff --git a/pkg/queue/health/probe_test.go b/pkg/queue/health/probe_test.go index f309d1f478ad..d4e5eb298e63 100644 --- a/pkg/queue/health/probe_test.go +++ b/pkg/queue/health/probe_test.go @@ -17,6 +17,8 @@ limitations under the License. package health import ( + "context" + "net" "net/http" "net/http/httptest" "net/url" @@ -28,9 +30,12 @@ import ( "go.uber.org/atomic" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" netheader "knative.dev/networking/pkg/http/header" + "knative.dev/pkg/ptr" ) func TestTCPProbe(t *testing.T) { @@ -265,6 +270,41 @@ func TestHTTPProbeResponseErrorFailure(t *testing.T) { } } +func TestGRPCProbeSuccessWithDefaultServiceName(t *testing.T) { + // use ephemeral port to prevent port conflict + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(s, &grpcHealthServer{}) + + errChan := make(chan error, 1) + go func() { + errChan <- s.Serve(lis) + }() + + assignedPort := lis.Addr().(*net.TCPAddr).Port + gRPCAction := newGRPCAction(t, assignedPort, "") + config := GRPCProbeConfigOptions{ + Timeout: time.Second, + GRPCAction: gRPCAction, + } + + if err := GRPCProbe(config); err != nil { + t.Error("Expected probe to succeed but it failed with", err) + } + + // explicitly stop grpc server + s.Stop() + + if grpcServerErr := <-errChan; grpcServerErr != nil { + t.Fatalf("Failed to run gRPC test server %v", grpcServerErr) + } + close(errChan) +} + func newH2cTestServer(t *testing.T, handler http.HandlerFunc) *httptest.Server { h2s := &http2.Server{} t.Helper() @@ -301,3 +341,20 @@ func newHTTPGetAction(t *testing.T, serverURL string) *corev1.HTTPGetAction { Scheme: corev1.URISchemeHTTP, } } + +func newGRPCAction(t *testing.T, port int, service string) *corev1.GRPCAction { + t.Helper() + + return &corev1.GRPCAction{ + Port: int32(port), + Service: ptr.String(service), + } +} + +type grpcHealthServer struct { + grpc_health_v1.UnimplementedHealthServer +} + +func (s *grpcHealthServer) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil +} diff --git a/pkg/queue/readiness/probe.go b/pkg/queue/readiness/probe.go index 2d8e0e16250f..f2c5dad15182 100644 --- a/pkg/queue/readiness/probe.go +++ b/pkg/queue/readiness/probe.go @@ -134,6 +134,8 @@ func (p *Probe) probeContainerImpl() bool { err = p.httpProbe() case p.TCPSocket != nil: err = p.tcpProbe() + case p.GRPC != nil: + err = p.grpcProbe() case p.Exec != nil: // Should never be reachable. Exec probes to be translated to // TCP probes when container is built. @@ -220,3 +222,15 @@ func (p *Probe) httpProbe() error { return health.HTTPProbe(config) }) } + +// grpcProbe function executes gRPC probe +func (p *Probe) grpcProbe() error { + config := health.GRPCProbeConfigOptions{ + GRPCAction: p.GRPC, + } + + return p.doProbe(func(to time.Duration) error { + config.Timeout = to + return health.GRPCProbe(config) + }) +} diff --git a/pkg/queue/readiness/probe_test.go b/pkg/queue/readiness/probe_test.go index 6b67c1ec0922..52d76383a130 100644 --- a/pkg/queue/readiness/probe_test.go +++ b/pkg/queue/readiness/probe_test.go @@ -18,6 +18,7 @@ package readiness import ( "bytes" + "context" "errors" "fmt" "net" @@ -30,6 +31,8 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" @@ -655,6 +658,49 @@ func TestKnTCPProbeSuccessThresholdIncludesFailure(t *testing.T) { } } +func TestGRPCSuccess(t *testing.T) { + t.Helper() + // use ephemeral port to prevent port conflict + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(s, &grpcHealthServer{}) + + errChan := make(chan error, 1) + go func() { + errChan <- s.Serve(lis) + }() + + assignedPort := lis.Addr().(*net.TCPAddr).Port + pb := NewProbe(&corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: int32(assignedPort), + Service: nil, + }, + }, + }) + + if !pb.ProbeContainer() { + t.Error("Probe failed. Expected success.") + } + + // explicitly stop grpc server + s.Stop() + + if grpcServerErr := <-errChan; grpcServerErr != nil { + t.Fatalf("Failed to run gRPC test server %v", grpcServerErr) + } + close(errChan) +} + func newTestServer(t *testing.T, h http.HandlerFunc) *url.URL { t.Helper() @@ -668,3 +714,11 @@ func newTestServer(t *testing.T, h http.HandlerFunc) *url.URL { return u } + +type grpcHealthServer struct { + grpc_health_v1.UnimplementedHealthServer +} + +func (s *grpcHealthServer) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil +} diff --git a/pkg/reconciler/revision/resources/deploy.go b/pkg/reconciler/revision/resources/deploy.go index a86e58f788a7..2f3a3ba459e7 100644 --- a/pkg/reconciler/revision/resources/deploy.go +++ b/pkg/reconciler/revision/resources/deploy.go @@ -267,8 +267,8 @@ func makeServingContainer(servingContainer corev1.Container, rev *v1.Revision) c servingContainer.Env = append(servingContainer.Env, buildUserPortEnv(userPortStr)) container := makeContainer(servingContainer, rev) if container.ReadinessProbe != nil { - if container.ReadinessProbe.HTTPGet != nil || container.ReadinessProbe.TCPSocket != nil { - // HTTP and TCP ReadinessProbes are executed by the queue-proxy directly against the + if container.ReadinessProbe.HTTPGet != nil || container.ReadinessProbe.TCPSocket != nil || container.ReadinessProbe.GRPC != nil { + // HTTP, TCP and gRPC ReadinessProbes are executed by the queue-proxy directly against the // user-container instead of via kubelet. container.ReadinessProbe = nil } diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index eac04c49e412..a668d5efe1d3 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -346,6 +346,16 @@ func withHTTPReadinessProbe(port int) *corev1.Probe { }} } +func withGRPCReadinessProbe(port int) *corev1.Probe { + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: int32(port), + Service: nil, + }, + }} +} + func withExecReadinessProbe(command []string) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -905,6 +915,27 @@ func TestMakePodSpec(t *testing.T) { withEnvVar("SERVING_READINESS_PROBE", `{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}`), ), }), + }, { + name: "with grpc readiness probe", + rev: revision("bar", "foo", + withContainers([]corev1.Container{{ + Name: servingContainerName, + Image: "busybox", + ReadinessProbe: withGRPCReadinessProbe(v1.DefaultUserPort), + }}), + WithContainerStatuses([]v1.ContainerStatus{{ + ImageDigest: "busybox@sha256:deadbeef", + }}), + ), + want: podSpec( + []corev1.Container{ + servingContainer(func(container *corev1.Container) { + container.Image = "busybox@sha256:deadbeef" + }), + queueContainer( + withEnvVar("SERVING_READINESS_PROBE", `{"grpc":{"port":8080,"service":null}}`), + ), + }), }, { name: "with tcp readiness probe", rev: revision("bar", "foo", diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 423f43dd09d8..1fb964a53dab 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -271,7 +271,9 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container if container.ReadinessProbe.TCPSocket != nil && container.ReadinessProbe.TCPSocket.Port.IntValue() != 0 { probePort = container.ReadinessProbe.TCPSocket.Port.IntVal } - + if container.ReadinessProbe.GRPC != nil && container.ReadinessProbe.GRPC.Port > 0 { + probePort = container.ReadinessProbe.GRPC.Port + } // The activator attempts to detect readiness itself by checking the Queue // Proxy's health endpoint rather than waiting for Kubernetes to check and // propagate the Ready state. We encode the original probe as JSON in an @@ -452,6 +454,8 @@ func applyReadinessProbeDefaultsForExec(p *corev1.Probe, port int32) { Port: intstr.FromInt(int(port)), } p.Exec = nil + case p.GRPC != nil: + p.GRPC.Port = port } if p.PeriodSeconds > 0 && p.TimeoutSeconds < 1 {