Skip to content

Commit

Permalink
feat(webserver): add FillRequestId option for the field "RequestId" f…
Browse files Browse the repository at this point in the history
…illing in Request and Response.
  • Loading branch information
searKing committed Sep 2, 2024
1 parent 8c14b3c commit 3feeaac
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 3 deletions.
7 changes: 4 additions & 3 deletions pkg/webserver/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

func TestNewWebServer(t *testing.T) {

srv, err := webserver.NewWebServer(webserver.FactoryConfig{
Name: "MockWebServer",
BindAddress: ":8080",
Expand All @@ -40,11 +39,13 @@ func TestNewWebServer(t *testing.T) {
url := "http://localhost:8080/healthz"
resp, err := http.Get(url)
if err != nil {
t.Fatalf("GET %q failed: %s", url, err)
t.Errorf("GET %q failed: %s", url, err)
return
}
data, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("dump response failed: %s", err)
t.Errorf("dump response failed: %s", err)
return
}
fmt.Printf("GET %s\n: %s\n", url, string(data))
}()
Expand Down
29 changes: 29 additions & 0 deletions pkg/webserver/pkg/requestid/requestid.clientstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package requestid

import (
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors"
"google.golang.org/grpc"
)

// requestIdClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to get/set request-id.
type requestIdClientStream struct {
grpc.ClientStream
}

func (s *requestIdClientStream) SendMsg(reply any) error {
newCtx, _ := tagLoggingRequestId(s.Context(), reply)
wrapped := interceptors.WrapClientStream(s.ClientStream)
wrapped.WrappedContext = newCtx
return wrapped.SendMsg(reply)
}

func (s *requestIdClientStream) RecvMsg(req any) error {
newCtx, _ := tagLoggingRequestId(s.Context(), req)
wrapped := interceptors.WrapClientStream(s.ClientStream)
wrapped.WrappedContext = newCtx
return wrapped.RecvMsg(req)
}
65 changes: 65 additions & 0 deletions pkg/webserver/pkg/requestid/requestid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package requestid

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// See https://http.dev/x-request-id
const requestId = "X-Request-ID"

// UnaryServerInterceptor returns a new unary server interceptors with logging tags in context with request_id.
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return UnaryHandler(handler)(ctx, req)
}
}

// UnaryHandler returns a new unary server handler that performs recovering from a panic.
func UnaryHandler(handler grpc.UnaryHandler) grpc.UnaryHandler {
return func(ctx context.Context, req any) (_ any, err error) {
newCtx, id := tagLoggingRequestId(ctx, req)
resp, err := handler(newCtx, req)
trySetRequestId(resp, id, true)
// inject "X-Request-ID" into HTTP Header
_ = grpc.SetHeader(ctx, metadata.Pairs(requestId, id))
return resp, err
}
}

// StreamServerInterceptor returns a new stream server interceptors with logging tags in context with request_id.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, &requestIdServerStream{ServerStream: ss})
}
}

// UnaryClientInterceptor returns a new unary client interceptors with logging tags in context with request_id.
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
newCtx, id := tagLoggingRequestId(ctx, req)
err := invoker(newCtx, method, req, reply, cc, opts...)
if err != nil {
return err
}
trySetRequestId(reply, id, true)
return nil
}
}

// StreamClientInterceptor returns a new stream client interceptors with tags in context with request_id.
func StreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
newStream := &requestIdClientStream{ClientStream: clientStream}
return newStream, err
}
}
29 changes: 29 additions & 0 deletions pkg/webserver/pkg/requestid/requestid.serverstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package requestid

import (
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors"
"google.golang.org/grpc"
)

// requestIdServerStream wraps grpc.ServerStream allowing each Sent/Recv of message to get/set request-id.
type requestIdServerStream struct {
grpc.ServerStream
}

func (s *requestIdServerStream) SendMsg(reply any) error {
newCtx, _ := tagLoggingRequestId(s.Context(), reply)
wrapped := interceptors.WrapServerStream(s.ServerStream)
wrapped.WrappedContext = newCtx
return wrapped.SendMsg(reply)
}

func (s *requestIdServerStream) RecvMsg(req any) error {
newCtx, _ := tagLoggingRequestId(s.Context(), req)
wrapped := interceptors.WrapServerStream(s.ServerStream)
wrapped.WrappedContext = newCtx
return wrapped.RecvMsg(req)
}
71 changes: 71 additions & 0 deletions pkg/webserver/pkg/requestid/requestid.struct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package requestid

import (
"context"
"reflect"

"github.com/google/uuid"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"

reflect_ "github.com/searKing/golang/go/reflect"
strings_ "github.com/searKing/golang/go/strings"
)

const structFieldNameRequestId = "RequestId"

func tryRetrieveRequestId(v any) string {
if reflect_.IsNil(v) {
return ""
}

if v, ok := v.(interface{ GetRequestId() string }); ok {
return v.GetRequestId()
}

field, has := reflect_.FieldByNames(reflect.ValueOf(v), structFieldNameRequestId)
if !has {
return ""
}
if v, ok := field.Interface().(string); ok {
return v
}
return ""
}

func trySetRequestId(v any, id string, ignoreEmpty bool) {
if reflect_.IsNil(v) {
return
}
if ignoreEmpty {
id := tryRetrieveRequestId(v)
if id != "" {
return
}
}
reflect_.SetFieldByNames(reflect.ValueOf(v), []string{structFieldNameRequestId}, reflect.ValueOf(id))
}

func tagLoggingRequestId(ctx context.Context, v any) (context.Context, string) {
id := tryRetrieveRequestId(v)
if id == "" {
id = strings_.ValueOrDefault(extractServerMetadataRequestId(ctx)...)
if id == "" {
id = uuid.NewString()
}
trySetRequestId(v, id, false)
}
return logging.InjectFields(ctx, logging.Fields{"request_id", id}), id
}

func extractServerMetadataRequestId(ctx context.Context) []string {
md, ok := runtime.ServerMetadataFromContext(ctx)
if !ok || md.HeaderMD == nil {
return nil
}
return md.HeaderMD.Get(requestId)
}
1 change: 1 addition & 0 deletions pkg/webserver/webserver.factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type FactoryConfig struct {
MaxSendMessageSizeInBytes int // sets the maximum message size in bytes the grpc server can send, The default is 0 (no limit is given).
StatsHandling bool // log for the related stats handling (e.g., RPCs, connections).
Validator *validator.Validate // for value validations for structs and individual fields based on tags (e.g., request).
FillRequestId bool // for the field "RequestId" filling in Request and Response.

// Deprecated: takes no effect, use slog instead.
EnableLogrusMiddleware bool // disable logrus middleware
Expand Down
13 changes: 13 additions & 0 deletions pkg/webserver/webserver.interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package webserver

import (
"github.com/rs/cors"
"github.com/searKing/golang/pkg/webserver/pkg/requestid"
"google.golang.org/grpc"

http_ "github.com/searKing/golang/go/net/http"
Expand All @@ -27,6 +28,10 @@ func (f *Factory) UnaryHandler(handlers ...grpc_.UnaryHandlerDecorator) []grpc_.
if v := f.fc.Validator; v != nil {
handlers = append(handlers, validator_.UnaryHandlerDecorator(v))
}
// request id
if f.fc.FillRequestId {
handlers = append(handlers, grpc_.UnaryHandlerDecoratorFunc(requestid.UnaryHandler))
}
return handlers
}

Expand All @@ -41,6 +46,10 @@ func (f *Factory) UnaryServerInterceptors(interceptors ...grpc.UnaryServerInterc
if v := f.fc.Validator; v != nil {
interceptors = append(interceptors, validator_.UnaryServerInterceptor(v))
}
// request id
if f.fc.FillRequestId {
interceptors = append(interceptors, requestid.UnaryServerInterceptor())
}
return interceptors
}

Expand All @@ -55,6 +64,10 @@ func (f *Factory) StreamServerInterceptors(interceptors ...grpc.StreamServerInte
if v := f.fc.Validator; v != nil {
interceptors = append(interceptors, validator_.StreamServerInterceptor(v))
}
// request id
if f.fc.FillRequestId {
interceptors = append(interceptors, requestid.StreamServerInterceptor())
}
return interceptors
}

Expand Down

0 comments on commit 3feeaac

Please sign in to comment.