Skip to content

Commit

Permalink
refactor(webserver): remove connection and interceptor related codes …
Browse files Browse the repository at this point in the history
…into webserver.connection.go and webserver.interceptors.go.
  • Loading branch information
searKing committed Aug 30, 2024
1 parent b2abdb2 commit 6857e8a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 48 deletions.
44 changes: 44 additions & 0 deletions pkg/webserver/webserver.connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package webserver

import (
"github.com/searKing/golang/pkg/webserver/pkg/stats"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func (f *Factory) ServerOptions(opts ...grpc.ServerOption) []grpc.ServerOption {
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc.MaxRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes))
} else {
opts = append(opts, grpc.MaxRecvMsgSize(defaultMaxReceiveMessageSize))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc.StatsHandler(&stats.ServerHandler{}))
}
return opts
}

func (f *Factory) DialOptions(opts ...grpc.DialOption) []grpc.DialOption {
if f.fc.NoGrpcProxy {
opts = append(opts, grpc.WithNoProxy())
}
if !f.fc.ForceDisableTls {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes), grpc.MaxCallSendMsgSize(f.fc.MaxReceiveMessageSizeInBytes)))
} else {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultMaxReceiveMessageSize), grpc.MaxCallSendMsgSize(defaultMaxSendMessageSize)))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc.WithStatsHandler(&stats.ClientHandler{}))
}

return opts
}
62 changes: 14 additions & 48 deletions pkg/webserver/webserver.factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ import (

"github.com/gin-gonic/gin"
"github.com/rs/cors"
"github.com/searKing/golang/pkg/webserver/pkg/stats"
"google.golang.org/grpc"

slog_ "github.com/searKing/golang/go/log/slog"
"github.com/searKing/golang/pkg/webserver/healthz"
"github.com/searKing/golang/pkg/webserver/pkg/recovery"
gin_ "github.com/searKing/golang/third_party/github.com/gin-gonic/gin"
grpc_ "github.com/searKing/golang/third_party/github.com/grpc-ecosystem/grpc-gateway-v2/grpc"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/burstlimit"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/timeoutlimit"
)

// ClientMaxReceiveMessageSize use 4GB as the default message size limit.
// grpc library default is 4MB
var defaultMaxReceiveMessageSize = math.MaxInt32 // 1024 * 1024 * 1024 * 4
var defaultMaxSendMessageSize = math.MaxInt32

// FactoryConfigFunc is an alias for a function that will take in a pointer to an FactoryConfig and modify it
type FactoryConfigFunc func(os *FactoryConfig) error

Expand Down Expand Up @@ -147,52 +146,24 @@ func (f *Factory) New() (*WebServer, error) {
}

opts := grpc_.WithDefault()
if f.fc.NoGrpcProxy {
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithNoProxy()))
}
{
// 设置GRPC最大消息大小
// connection options
// http -> grpc client -> grpc server
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(f.fc.MaxReceiveMessageSizeInBytes))))
} else {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxRecvMsgSize(defaultMaxReceiveMessageSize)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaultMaxReceiveMessageSize))))
}
// http <- grpc client <- grpc server
if f.fc.MaxSendMessageSizeInBytes > 0 {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxSendMsgSize(f.fc.MaxSendMessageSizeInBytes)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(f.fc.MaxSendMessageSizeInBytes))))
} else {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxSendMsgSize(defaultMaxSendMessageSize)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultMaxSendMessageSize))))
}
opts = append(opts, grpc_.WithGrpcServerOption(f.ServerOptions()...))
opts = append(opts, grpc_.WithGrpcDialOption(f.DialOptions()...))
}
{
// recover
opts = append(opts, grpc_.WithGrpcUnaryServerChain(recovery.UnaryServerInterceptor()))
opts = append(opts, grpc_.WithGrpcStreamServerChain(recovery.StreamServerInterceptor()))
// grpc interceptors
opts = append(opts, grpc_.WithGrpcUnaryServerChain(f.UnaryServerInterceptors()...))
opts = append(opts, grpc_.WithGrpcStreamServerChain(f.StreamServerInterceptors()...))
}
{
// handle request timeout
opts = append(opts, grpc_.WithGrpcUnaryServerChain(timeoutlimit.UnaryServerInterceptor(f.fc.HandledTimeoutUnary)))
opts = append(opts, grpc_.WithGrpcStreamServerChain(timeoutlimit.StreamServerInterceptor(f.fc.HandledTimeoutStream)))
}
{
// burst limit
opts = append(opts, grpc_.WithGrpcUnaryServerChain(burstlimit.UnaryServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary)))
opts = append(opts, grpc_.WithGrpcStreamServerChain(burstlimit.StreamServerInterceptor(f.fc.MaxConcurrencyStream, f.fc.BurstLimitTimeoutStream)))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithStatsHandler(&stats.ClientHandler{})))
opts = append(opts, grpc_.WithGrpcServerOption(grpc.StatsHandler(&stats.ServerHandler{})))
// http interceptors
opts = append(opts, grpc_.WithHttpHandlerDecorators(f.HttpServerInterceptors()...))
}

// cors
opts = append(opts, grpc_.WithHttpWrapper(cors.New(f.fc.Cors).Handler))
opts = append(opts, f.fc.GatewayOptions...)
// log
opts = append(opts, grpc_.WithSlogLoggerConfig(slog.Default().Handler(), grpc_.ExtractLoggingOptions(opts...))...)
grpcBackend := grpc_.NewGatewayTLS(f.fc.BindAddress, f.fc.TlsConfig, opts...)
{
Expand Down Expand Up @@ -262,8 +233,3 @@ func (f *Factory) New() (*WebServer, error) {

return s, nil
}

// ClientMaxReceiveMessageSize use 4GB as the default message size limit.
// grpc library default is 4MB
var defaultMaxReceiveMessageSize = math.MaxInt32 // 1024 * 1024 * 4
var defaultMaxSendMessageSize = math.MaxInt32
47 changes: 47 additions & 0 deletions pkg/webserver/webserver.interceptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package webserver

import (
"context"

"github.com/rs/cors"
http_ "github.com/searKing/golang/go/net/http"
"github.com/searKing/golang/pkg/webserver/pkg/recovery"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/burstlimit"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/timeoutlimit"
"google.golang.org/grpc"
)

func LocalGatewayWrap[REQ any, RESP any](handler func(ctx context.Context, req REQ) (RESP, error)) func(
ctx context.Context, req REQ) (RESP, error) {
return recovery.WrapRecovery(handler)
}

func (f *Factory) UnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) []grpc.UnaryServerInterceptor {
// recover
interceptors = append(interceptors, recovery.UnaryServerInterceptor())
// handle request timeout
interceptors = append(interceptors, timeoutlimit.UnaryServerInterceptor(f.fc.HandledTimeoutUnary))
// burst limit
interceptors = append(interceptors, burstlimit.UnaryServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary))
return interceptors
}

func (f *Factory) StreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) []grpc.StreamServerInterceptor {
// recover
interceptors = append(interceptors, recovery.StreamServerInterceptor())
// handle request timeout
interceptors = append(interceptors, timeoutlimit.StreamServerInterceptor(f.fc.HandledTimeoutUnary))
// burst limit
interceptors = append(interceptors, burstlimit.StreamServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary))
return interceptors
}

func (f *Factory) HttpServerInterceptors(decorators ...http_.HandlerDecorator) []http_.HandlerDecorator {
// cors
decorators = append(decorators, http_.HandlerDecoratorFunc(cors.New(f.fc.Cors).Handler))
return decorators
}

0 comments on commit 6857e8a

Please sign in to comment.