From 79d77338086d2e63a5cfccc7d5e66df98adcfdd1 Mon Sep 17 00:00:00 2001 From: dengzii Date: Thu, 16 Jun 2022 18:14:46 +0800 Subject: [PATCH] update rpc, build --- build/build-linux-amd64.sh | 15 +++++ build/build-windows-amd64.sh | 15 +++++ cmd/im_service/main.go | 2 +- example/client/rpc_client_example.go | 4 +- go.mod | 2 +- internal/im_server/rpc_service.go | 2 +- internal/rpc/client.go | 86 -------------------------- internal/rpc/context.go | 60 ------------------- internal/rpc/grpc_client.go | 89 --------------------------- internal/rpc/grpc_server.go | 67 --------------------- internal/rpc/selector.go | 52 ---------------- internal/rpc/server.go | 90 ---------------------------- internal/rpc/service_router.go | 40 ------------- pkg/client/im_service_rpc_client.go | 2 +- pkg/client/rpc_client.go | 2 +- 15 files changed, 37 insertions(+), 491 deletions(-) create mode 100644 build/build-linux-amd64.sh create mode 100644 build/build-windows-amd64.sh delete mode 100644 internal/rpc/client.go delete mode 100644 internal/rpc/context.go delete mode 100644 internal/rpc/grpc_client.go delete mode 100644 internal/rpc/grpc_server.go delete mode 100644 internal/rpc/selector.go delete mode 100644 internal/rpc/server.go delete mode 100644 internal/rpc/service_router.go diff --git a/build/build-linux-amd64.sh b/build/build-linux-amd64.sh new file mode 100644 index 0000000..5405988 --- /dev/null +++ b/build/build-linux-amd64.sh @@ -0,0 +1,15 @@ +cd ../cmd/im_service || exit + +export CGO_ENABLED=0 +export GOOS=linux +export GOHOSTOS=linux +export GOARCH=amd64 + +echo 'build...' +go build +echo 'build complete' +cp ../../config/config.toml config.toml +tar -czvf ./im_service_linux_amd64.tar.gz im_service config.toml +rm config.toml +rm im_service +read -p 'complete.' \ No newline at end of file diff --git a/build/build-windows-amd64.sh b/build/build-windows-amd64.sh new file mode 100644 index 0000000..aa28f51 --- /dev/null +++ b/build/build-windows-amd64.sh @@ -0,0 +1,15 @@ +cd ../cmd/im_service || exit + +export CGO_ENABLED=0 +export GOOS=windows +export GOHOSTOS=windows +export GOARCH=amd64 + +echo 'build...' +go build -o im_service.exe +echo 'build complete' +cp ../../config/config.toml config.toml +tar -czvf ./im_service_windows_amd64.tar.gz im_service.exe config.toml +rm config.toml +rm im_service +read -p 'complete.' \ No newline at end of file diff --git a/cmd/im_service/main.go b/cmd/im_service/main.go index 2ee48d3..a114e19 100644 --- a/cmd/im_service/main.go +++ b/cmd/im_service/main.go @@ -5,12 +5,12 @@ import ( "github.com/glide-im/glide/pkg/bootstrap" "github.com/glide-im/glide/pkg/logger" "github.com/glide-im/glide/pkg/messaging/message_handler" + "github.com/glide-im/glide/pkg/rpc" "github.com/glide-im/glide/pkg/store" "github.com/glide-im/glide/pkg/subscription/subscription_impl" "github.com/glide-im/im-service/internal/config" "github.com/glide-im/im-service/internal/im_server" "github.com/glide-im/im-service/internal/message_store_db" - "github.com/glide-im/im-service/internal/rpc" ) func main() { diff --git a/example/client/rpc_client_example.go b/example/client/rpc_client_example.go index 222d5f1..471e012 100644 --- a/example/client/rpc_client_example.go +++ b/example/client/rpc_client_example.go @@ -3,7 +3,7 @@ package main import ( "github.com/glide-im/glide/pkg/gate" "github.com/glide-im/glide/pkg/messages" - "github.com/glide-im/im-service/internal/rpc" + "github.com/glide-im/glide/pkg/rpc" "github.com/glide-im/im-service/pkg/client" ) @@ -22,7 +22,7 @@ func RpcClientExample() { if err != nil { panic(err) } - err = cli.EnqueueMessage(gate.NewID2(1), messages.NewEmptyMessage()) + err = cli.EnqueueMessage(gate.NewID2("1"), messages.NewEmptyMessage()) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index fb64a99..468abc2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/glide-im/im-service go 1.18 require ( - github.com/glide-im/glide v1.2.5 + github.com/glide-im/glide v1.2.6 github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.6.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 diff --git a/internal/im_server/rpc_service.go b/internal/im_server/rpc_service.go index c55eed0..640d7c5 100644 --- a/internal/im_server/rpc_service.go +++ b/internal/im_server/rpc_service.go @@ -5,7 +5,7 @@ import ( "encoding/json" "github.com/glide-im/glide/pkg/gate" "github.com/glide-im/glide/pkg/messages" - "github.com/glide-im/im-service/internal/rpc" + "github.com/glide-im/glide/pkg/rpc" "github.com/glide-im/im-service/pkg/proto" ) diff --git a/internal/rpc/client.go b/internal/rpc/client.go deleted file mode 100644 index 3c679e4..0000000 --- a/internal/rpc/client.go +++ /dev/null @@ -1,86 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - etcd "github.com/rpcxio/rpcx-etcd/client" - "github.com/smallnest/rpcx/client" - "github.com/smallnest/rpcx/protocol" -) - -type Cli interface { - Call(ctx context.Context, fn string, request, reply interface{}) error - Broadcast(fn string, request, reply interface{}) error - Run() error - Close() error -} - -type ClientOptions struct { - client.Option - - Addr string - Port int - Name string - EtcdServers []string - Selector client.Selector -} - -type BaseClient struct { - cli client.XClient - options *ClientOptions - id string -} - -func NewBaseClient(options *ClientOptions) (*BaseClient, error) { - ret := &BaseClient{ - options: options, - } - - var discovery client.ServiceDiscovery - var err error - - if options.EtcdServers != nil { - discovery, err = etcd.NewEtcdV3Discovery(BaseServicePath, options.Name, options.EtcdServers, false, nil) - if err != nil { - return nil, err - } - } else { - srv := fmt.Sprintf("%s@%s:%d", "tcp", options.Addr, options.Port) - discovery, _ = client.NewPeer2PeerDiscovery(srv, "") - } - - if options.SerializeType == protocol.SerializeNone { - // using protobuffer serializer by default - options.SerializeType = protocol.ProtoBuffer - } - ret.cli = client.NewXClient(options.Name, client.Failtry, client.RoundRobin, discovery, options.Option) - - if options.Selector != nil { - ret.cli.SetSelector(options.Selector) - } else { - // using round robbin selector by default - ret.cli.SetSelector(NewRoundRobinSelector()) - } - return ret, nil -} - -func (c *BaseClient) Call2(fn string, arg interface{}, reply interface{}) error { - return c.Call(context.Background(), fn, arg, reply) -} - -func (c *BaseClient) Broadcast(fn string, request, reply interface{}) error { - return c.cli.Broadcast(context.Background(), fn, request, reply) -} - -func (c *BaseClient) Call(ctx context.Context, fn string, arg interface{}, reply interface{}) error { - err := c.cli.Call(ctx, fn, arg, reply) - return err -} - -func (c *BaseClient) Run() error { - return nil -} - -func (c *BaseClient) Close() error { - return c.cli.Close() -} diff --git a/internal/rpc/context.go b/internal/rpc/context.go deleted file mode 100644 index b3ffc05..0000000 --- a/internal/rpc/context.go +++ /dev/null @@ -1,60 +0,0 @@ -package rpc - -import ( - "context" - "github.com/smallnest/rpcx/share" -) - -type ExtraContext struct { - context.Context -} - -func NewContextFrom(c context.Context) *ExtraContext { - return &ExtraContext{c} -} - -func NewContext() *ExtraContext { - return NewContextFrom(context.Background()) -} - -func (c *ExtraContext) PutReqExtra(k string, v string) *ExtraContext { - mate := c.Context.Value(share.ReqMetaDataKey) - if mate == nil { - mate = map[string]string{} - c.Context = context.WithValue(c.Context, share.ReqMetaDataKey, mate) - } - m := c.Context.Value(share.ReqMetaDataKey).(map[string]string) - m[k] = v - return c -} - -func (c *ExtraContext) PutResExtra(k string, v string) *ExtraContext { - mate := c.Context.Value(share.ResMetaDataKey) - if mate == nil { - mate = map[string]string{} - c.Context = context.WithValue(c.Context, share.ResMetaDataKey, mate) - } - m := c.Context.Value(share.ResMetaDataKey).(map[string]string) - m[k] = v - return c -} - -func (c *ExtraContext) GetReqExtra(k string) (string, bool) { - mate := c.Context.Value(share.ReqMetaDataKey) - if mate == nil { - return "", false - } - m := c.Context.Value(share.ReqMetaDataKey).(map[string]string) - v, ok := m[k] - return v, ok -} - -func (c *ExtraContext) GetResExtra(k string) (string, bool) { - mate := c.Context.Value(share.ResMetaDataKey) - if mate == nil { - return "", false - } - m := c.Context.Value(share.ResMetaDataKey).(map[string]string) - v, ok := m[k] - return v, ok -} diff --git a/internal/rpc/grpc_client.go b/internal/rpc/grpc_client.go deleted file mode 100644 index ac5b3d0..0000000 --- a/internal/rpc/grpc_client.go +++ /dev/null @@ -1,89 +0,0 @@ -package rpc - -import ( - "context" - "errors" - "fmt" - "github.com/glide-im/glide/pkg/logger" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/stats" -) - -type BaseGRpcClient struct { - Conn *grpc.ClientConn - - AppId int64 - Options *ClientOptions -} - -func NewBaseGRpcClient(options *ClientOptions) *BaseGRpcClient { - ret := &BaseGRpcClient{} - ret.Init(options) - return ret -} - -func (b *BaseGRpcClient) Init(options *ClientOptions) { - if options == nil { - b.Options = &ClientOptions{ - Addr: "localhost", - Port: 5555, - } - } else { - b.Options = options - } -} - -func (b *BaseGRpcClient) unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if b.Conn.GetState() != connectivity.Ready { - return errors.New("client is not connect to the server") - } - logger.D("rpc client call method: %s", method) - err := invoker(ctx, method, req, reply, cc, opts...) - if err != nil { - logger.E("rpc client method call error", err) - } - logger.D("response=%v", reply) - return err -} - -func (b *BaseGRpcClient) Run() error { - - if b.Options == nil { - b.Init(nil) - } - var err error - target := fmt.Sprintf("%s:%d", b.Options.Addr, b.Options.Port) - - b.Conn, err = grpc.Dial(target, - grpc.WithInsecure(), // insecure connection - grpc.WithBlock(), // blocking until dial success - grpc.WithUnaryInterceptor(b.unaryInterceptor), - grpc.WithStatsHandler(newStateHandler()), - grpc.WithUserAgent("client-id: none")) - - return err -} - -type statsHandler struct { -} - -func newStateHandler() *statsHandler { - return &statsHandler{} -} - -func (h *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - return context.TODO() -} - -func (h *statsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { - -} - -func (h *statsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { - return context.TODO() -} - -func (h *statsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) { - -} diff --git a/internal/rpc/grpc_server.go b/internal/rpc/grpc_server.go deleted file mode 100644 index 6696c14..0000000 --- a/internal/rpc/grpc_server.go +++ /dev/null @@ -1,67 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "github.com/glide-im/glide/pkg/logger" - "google.golang.org/grpc" - "math" - "net" -) - -type Runnable interface { - Run() error -} - -type BaseGRpcServer struct { - RpcServer *grpc.Server - Socket net.Listener - - AppId int64 - Options *ServerOptions -} - -func NewBaseGRpcServer(options *ServerOptions) *BaseGRpcServer { - ret := &BaseGRpcServer{ - Options: options, - } - ret.init(options) - return ret -} - -func (s *BaseGRpcServer) init(options *ServerOptions) { - if options == nil { - options = &ServerOptions{ - Network: "tcp", - Addr: "localhost", - Port: 5555, - MaxRecvMsgSize: math.MaxInt32, - MaxSendMsgSize: math.MaxInt32, - } - } - - var err error - addr := fmt.Sprintf("%s:%d", options.Addr, options.Port) - s.Socket, err = net.Listen(options.Network, addr) - if err != nil { - panic(err) - } - op := []grpc.ServerOption{ - grpc.UnaryInterceptor(s.unaryLogInterceptor), - grpc.MaxRecvMsgSize(options.MaxRecvMsgSize), - grpc.MaxSendMsgSize(options.MaxSendMsgSize), - } - s.RpcServer = grpc.NewServer(op...) -} - -func (s *BaseGRpcServer) unaryLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - logger.I("grpc server method called: %s", info.FullMethod) - return handler(ctx, req) -} - -func (s *BaseGRpcServer) Run() error { - if s.Options == nil { - s.init(nil) - } - return s.RpcServer.Serve(s.Socket) -} diff --git a/internal/rpc/selector.go b/internal/rpc/selector.go deleted file mode 100644 index 91b5081..0000000 --- a/internal/rpc/selector.go +++ /dev/null @@ -1,52 +0,0 @@ -package rpc - -import ( - "context" - "github.com/glide-im/glide/pkg/logger" - "github.com/smallnest/rpcx/client" - "github.com/smallnest/rpcx/share" -) - -type selector struct { - services map[string]string - round client.Selector - tags map[string]string -} - -func newSelector() *selector { - s := map[string]string{} - return &selector{ - services: s, - round: NewRoundRobinSelector(), - tags: map[string]string{}, - } -} - -func (r *selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string { - - m := ctx.Value(share.ReqMetaDataKey).(map[string]string) - - if target, ok := m["ExtraTarget"]; ok { - if _, ok := r.services[target]; ok { - return target - } - logger.E("unknown service addr, ExtraTarget:", target) - } - - if tag, ok := m["ExtraTag"]; ok { - if path, ok := r.tags[tag]; ok { - if _, ok := r.services[path]; ok { - logger.D("route by tag: %s=%s", tag, path) - return path - } - } - } - return r.round.Select(ctx, servicePath, serviceMethod, args) -} - -func (r *selector) UpdateServer(servers map[string]string) { - r.round.UpdateServer(servers) - for k, v := range servers { - r.services[k] = v - } -} diff --git a/internal/rpc/server.go b/internal/rpc/server.go deleted file mode 100644 index 3929f17..0000000 --- a/internal/rpc/server.go +++ /dev/null @@ -1,90 +0,0 @@ -package rpc - -import ( - "fmt" - "github.com/rcrowley/go-metrics" - "github.com/rpcxio/rpcx-etcd/serverplugin" - "github.com/smallnest/rpcx/server" - "time" -) - -const ( - BaseServicePath = "/im_service" -) - -type ServerOptions struct { - Name string - Network string - Addr string - Port int - MaxRecvMsgSize int - MaxSendMsgSize int - EtcdServers []string -} - -type BaseServer struct { - Srv *server.Server - - Options *ServerOptions - etcdRegister *serverplugin.EtcdV3RegisterPlugin - reg []func(srv *BaseServer) error - id string -} - -func NewBaseServer(options *ServerOptions) *BaseServer { - ret := &BaseServer{ - Srv: server.NewServer(), - id: fmt.Sprintf("%s@%s:%d", options.Name, options.Addr, options.Port), - } - - if options.Network == "" { - options.Network = "tcp" - } - - ret.Options = options - if len(options.EtcdServers) != 0 { - ret.etcdRegister = &serverplugin.EtcdV3RegisterPlugin{ - EtcdServers: options.EtcdServers, - BasePath: BaseServicePath, - Metrics: metrics.NewRegistry(), - UpdateInterval: time.Minute, - } - } - return ret -} - -func (s *BaseServer) GetServerID() string { - if len(s.id) == 0 { - s.id = fmt.Sprintf("%s@%s:%d", s.Options.Name, s.Options.Addr, s.Options.Port) - } - return s.id -} - -func (s *BaseServer) Register(name string, sv interface{}) { - s.reg = append(s.reg, func(srv *BaseServer) error { - return srv.Srv.RegisterName(name, sv, "") - }) -} - -func (s *BaseServer) Run() error { - - addr := fmt.Sprintf("%s:%d", s.Options.Addr, s.Options.Port) - - if s.etcdRegister != nil { - s.etcdRegister.ServiceAddress = s.Options.Network + "@" + addr - - err := s.etcdRegister.Start() - if err != nil { - return err - } - s.Srv.Plugins.Add(s.etcdRegister) - } - - for _, f := range s.reg { - if er := f(s); er != nil { - return er - } - } - - return s.Srv.Serve(s.Options.Network, addr) -} diff --git a/internal/rpc/service_router.go b/internal/rpc/service_router.go deleted file mode 100644 index eeb3131..0000000 --- a/internal/rpc/service_router.go +++ /dev/null @@ -1,40 +0,0 @@ -package rpc - -import ( - "context" - "github.com/smallnest/rpcx/client" -) - -// RoundRobinSelector selects servers with roundrobin. -type RoundRobinSelector struct { - servers []string - i int -} - -func NewRoundRobinSelector() client.Selector { - return &RoundRobinSelector{servers: []string{}} -} - -func (s *RoundRobinSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string { - return s.SelectNext() -} - -func (s *RoundRobinSelector) SelectNext() string { - ss := s.servers - if len(ss) == 0 { - return "" - } - i := s.i - i = i % len(ss) - s.i = i + 1 - return ss[i] -} - -func (s *RoundRobinSelector) UpdateServer(servers map[string]string) { - ss := make([]string, 0, len(servers)) - for k := range servers { - ss = append(ss, k) - } - - s.servers = ss -} diff --git a/pkg/client/im_service_rpc_client.go b/pkg/client/im_service_rpc_client.go index 2ba832f..d77d099 100644 --- a/pkg/client/im_service_rpc_client.go +++ b/pkg/client/im_service_rpc_client.go @@ -7,7 +7,7 @@ import ( "fmt" "github.com/glide-im/glide/pkg/gate" "github.com/glide-im/glide/pkg/messages" - "github.com/glide-im/im-service/internal/rpc" + "github.com/glide-im/glide/pkg/rpc" "github.com/glide-im/im-service/pkg/proto" "strings" ) diff --git a/pkg/client/rpc_client.go b/pkg/client/rpc_client.go index e86a997..1a55514 100644 --- a/pkg/client/rpc_client.go +++ b/pkg/client/rpc_client.go @@ -2,7 +2,7 @@ package client import ( "context" - "github.com/glide-im/im-service/internal/rpc" + "github.com/glide-im/glide/pkg/rpc" "github.com/glide-im/im-service/pkg/proto" "github.com/glide-im/im-service/pkg/server" )