From e722236c332b505506a84add6806fedf4f2e091f Mon Sep 17 00:00:00 2001 From: stormcat24 Date: Mon, 29 May 2017 09:14:36 +0900 Subject: [PATCH] modified event transmission method to gRPC request stream --- README.md | 28 ++++++++++- manager/client.go | 18 ++++++++ protobuf/stream.pb.go | 47 ++++++++++--------- protobuf/stream.proto | 2 +- server/grpc.go | 105 +++++++++++++++++++++++++++++------------- server/grpc_test.go | 6 ++- 6 files changed, 150 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index a034d70..de8cd6d 100644 --- a/README.md +++ b/README.md @@ -137,10 +137,15 @@ func main() { }, } - ss, err := client.Events(ctx, &req) + ss, err := client.Events(ctx) if err != nil { log.Fatal(err) } + + // subscribe event + if err := ss.Send(&req); err != nil { + log.Fatal(err) + } for { resp, err := ss.Recv() @@ -157,6 +162,27 @@ func main() { } ``` +### unsubscribe + +`Events` request is stream. If you unsubscribe event, set empty event data. + +``` + req := proto.Request{ + // empty events + Events: []*proto.EventType{}, + } + + ss, err := client.Events(ctx) + if err != nil { + log.Fatal(err) + } + + // unsubscribe event + if err := ss.Send(&req); err != nil { + log.Fatal(err) + } +``` + # Usage Publisher You publish events to the channel that Plasma subscribes according to the following JSON Schema. diff --git a/manager/client.go b/manager/client.go index 16f5efc..b2c0ed3 100644 --- a/manager/client.go +++ b/manager/client.go @@ -16,6 +16,10 @@ func (c *Client) ReceivePayload() <-chan event.Payload { return c.payloadChan } +func (c *Client) SetEvents(events []string) { + c.events = events +} + func NewClient(events []string) Client { return Client{ events: events, @@ -59,6 +63,20 @@ func (cm *ClientManager) RemoveClient(client Client) { close(client.payloadChan) } +func (cm *ClientManager) DeleteEvents(client *Client) { + for _, e := range client.events { + clients, ok := cm.clientsTable[e] + if !ok { + continue + } + clients.mu.Lock() + delete(clients.clients, client.payloadChan) + clients.mu.Unlock() + + delete(cm.clientsTable, e) + } +} + const eventSeparator = ":" func (cm *ClientManager) createEvents(request string) []string { diff --git a/protobuf/stream.pb.go b/protobuf/stream.pb.go index f13ec28..d1c0d70 100644 --- a/protobuf/stream.pb.go +++ b/protobuf/stream.pb.go @@ -108,7 +108,7 @@ const _ = grpc.SupportPackageIsVersion4 // Client API for StreamService service type StreamServiceClient interface { - Events(ctx context.Context, in *Request, opts ...grpc.CallOption) (StreamService_EventsClient, error) + Events(ctx context.Context, opts ...grpc.CallOption) (StreamService_EventsClient, error) } type streamServiceClient struct { @@ -119,22 +119,17 @@ func NewStreamServiceClient(cc *grpc.ClientConn) StreamServiceClient { return &streamServiceClient{cc} } -func (c *streamServiceClient) Events(ctx context.Context, in *Request, opts ...grpc.CallOption) (StreamService_EventsClient, error) { +func (c *streamServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (StreamService_EventsClient, error) { stream, err := grpc.NewClientStream(ctx, &_StreamService_serviceDesc.Streams[0], c.cc, "/proto.StreamService/Events", opts...) if err != nil { return nil, err } x := &streamServiceEventsClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } return x, nil } type StreamService_EventsClient interface { + Send(*Request) error Recv() (*Payload, error) grpc.ClientStream } @@ -143,6 +138,10 @@ type streamServiceEventsClient struct { grpc.ClientStream } +func (x *streamServiceEventsClient) Send(m *Request) error { + return x.ClientStream.SendMsg(m) +} + func (x *streamServiceEventsClient) Recv() (*Payload, error) { m := new(Payload) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -154,7 +153,7 @@ func (x *streamServiceEventsClient) Recv() (*Payload, error) { // Server API for StreamService service type StreamServiceServer interface { - Events(*Request, StreamService_EventsServer) error + Events(StreamService_EventsServer) error } func RegisterStreamServiceServer(s *grpc.Server, srv StreamServiceServer) { @@ -162,15 +161,12 @@ func RegisterStreamServiceServer(s *grpc.Server, srv StreamServiceServer) { } func _StreamService_Events_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(Request) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(StreamServiceServer).Events(m, &streamServiceEventsServer{stream}) + return srv.(StreamServiceServer).Events(&streamServiceEventsServer{stream}) } type StreamService_EventsServer interface { Send(*Payload) error + Recv() (*Request, error) grpc.ServerStream } @@ -182,6 +178,14 @@ func (x *streamServiceEventsServer) Send(m *Payload) error { return x.ServerStream.SendMsg(m) } +func (x *streamServiceEventsServer) Recv() (*Request, error) { + m := new(Request) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _StreamService_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.StreamService", HandlerType: (*StreamServiceServer)(nil), @@ -191,6 +195,7 @@ var _StreamService_serviceDesc = grpc.ServiceDesc{ StreamName: "Events", Handler: _StreamService_Events_Handler, ServerStreams: true, + ClientStreams: true, }, }, Metadata: "stream.proto", @@ -199,7 +204,7 @@ var _StreamService_serviceDesc = grpc.ServiceDesc{ func init() { proto1.RegisterFile("stream.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 211 bytes of a gzipped FileDescriptorProto + // 213 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc6, 0x5c, 0xec, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x1a, 0x5c, 0x6c, 0xae, 0x65, 0xa9, 0x79, 0x25, @@ -208,10 +213,10 @@ var fileDescriptor0 = []byte{ 0xb2, 0x20, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08, 0xcc, 0x56, 0xf2, 0xe5, 0x62, 0x0f, 0x48, 0xac, 0xcc, 0xc9, 0x4f, 0x4c, 0x11, 0xd2, 0xe3, 0xe2, 0x4c, 0x85, 0xa9, 0x05, 0xab, 0xc1, 0x66, 0x30, 0x42, 0x09, 0xc8, 0xb8, 0x94, 0xc4, 0x92, 0x44, 0x09, 0x26, 0x88, 0x71, 0x20, 0xb6, - 0x91, 0x2d, 0x17, 0x6f, 0x30, 0xd8, 0xed, 0xc1, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x42, 0x3a, - 0x30, 0xa7, 0x0a, 0xf1, 0x41, 0xcd, 0x82, 0x7a, 0x42, 0x0a, 0xc6, 0x87, 0x5a, 0xaf, 0xc4, 0x60, - 0xc0, 0xe8, 0x24, 0xcb, 0x25, 0x90, 0x5f, 0x90, 0x9a, 0x97, 0x56, 0x94, 0x5a, 0x9c, 0xa1, 0x57, - 0x90, 0x93, 0x58, 0x9c, 0x9b, 0x18, 0xc0, 0xb8, 0x88, 0x89, 0x2d, 0xc0, 0xc7, 0x31, 0xd8, 0xd7, - 0x31, 0x89, 0x0d, 0xac, 0xc3, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0a, 0xf7, 0x93, 0x3e, 0x20, - 0x01, 0x00, 0x00, + 0x91, 0x3d, 0x17, 0x6f, 0x30, 0xd8, 0xed, 0xc1, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x42, 0x7a, + 0x30, 0xa7, 0x0a, 0xf1, 0x41, 0xcd, 0x82, 0x7a, 0x42, 0x0a, 0xc6, 0x87, 0x5a, 0xaf, 0xc4, 0xa0, + 0xc1, 0x68, 0xc0, 0xe8, 0x24, 0xcb, 0x25, 0x90, 0x5f, 0x90, 0x9a, 0x97, 0x56, 0x94, 0x5a, 0x9c, + 0xa1, 0x57, 0x90, 0x93, 0x58, 0x9c, 0x9b, 0x18, 0xc0, 0xb8, 0x88, 0x89, 0x2d, 0xc0, 0xc7, 0x31, + 0xd8, 0xd7, 0x31, 0x89, 0x0d, 0xac, 0xc7, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xbd, 0x6a, 0xa0, + 0x80, 0x22, 0x01, 0x00, 0x00, } diff --git a/protobuf/stream.proto b/protobuf/stream.proto index c90abbe..efafe17 100644 --- a/protobuf/stream.proto +++ b/protobuf/stream.proto @@ -7,7 +7,7 @@ option objc_class_prefix = "PLASMA"; package proto; service StreamService { - rpc Events(Request) returns (stream Payload) {} + rpc Events(stream Request) returns (stream Payload) {} } diff --git a/server/grpc.go b/server/grpc.go index 00db4e8..733c4c5 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -1,6 +1,7 @@ package server import ( + "io" "time" "go.uber.org/zap" @@ -15,6 +16,7 @@ import ( "github.com/openfresh/plasma/protobuf" "github.com/openfresh/plasma/pubsub" "github.com/pkg/errors" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" ) @@ -61,25 +63,34 @@ func NewGRPCServer(opt Option) (*GRPCServer, error) { return gs, nil } +type refreshEvents struct { + client *manager.Client + events []string +} + type StreamServer struct { - clientManager *manager.ClientManager - newClients chan manager.Client - removeClients chan manager.Client - payloads chan event.Payload - pubsub pubsub.PubSuber - accessLogger *zap.Logger - errorLogger *zap.Logger + clientManager *manager.ClientManager + newClients chan manager.Client + removeClients chan manager.Client + payloads chan event.Payload + resfreshEvents chan refreshEvents + errChan chan error + pubsub pubsub.PubSuber + accessLogger *zap.Logger + errorLogger *zap.Logger } func NewStreamServer(opt Option) *StreamServer { ss := &StreamServer{ - clientManager: manager.NewClientManager(), - newClients: make(chan manager.Client), - removeClients: make(chan manager.Client), - payloads: make(chan event.Payload), - pubsub: opt.PubSuber, - accessLogger: opt.AccessLogger, - errorLogger: opt.ErrorLogger, + clientManager: manager.NewClientManager(), + newClients: make(chan manager.Client), + removeClients: make(chan manager.Client), + payloads: make(chan event.Payload), + errChan: make(chan error), + resfreshEvents: make(chan refreshEvents), + pubsub: opt.PubSuber, + accessLogger: opt.AccessLogger, + errorLogger: opt.ErrorLogger, } ss.pubsub.Subscribe(func(payload event.Payload) { ss.payloads <- payload @@ -101,35 +112,65 @@ func (ss *StreamServer) Run() { metrics.DecConnection() case payload := <-ss.payloads: ss.clientManager.SendPayload(payload) + case re := <-ss.resfreshEvents: + ss.clientManager.DeleteEvents(re.client) + re.client.SetEvents(re.events) + ss.clientManager.AddClient(*re.client) } } }() } -func (ss *StreamServer) Events(request *proto.Request, es proto.StreamService_EventsServer) error { - ss.accessLogger.Info("gRPC", - zap.Array("request-events", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { - for _, e := range request.Events { - enc.AppendString(e.Type) +func (ss *StreamServer) Events(es proto.StreamService_EventsServer) error { + client := manager.NewClient([]string{}) + ss.newClients <- client + go func() { + for { + request, err := es.Recv() + if err == io.EOF { + <-es.Context().Done() + return } - return nil - })), - ) - if request == nil || request.Events == nil { - return errors.New("request can't be nil") - } - l := len(request.Events) - events := make([]string, l) - for i := 0; i < l; i++ { - events[i] = request.Events[i].Type - } + if err != nil { + if grpc.Code(err) != codes.Canceled { + ss.errChan <- errors.Wrap(err, "Recv error") + return + } else { + <-es.Context().Done() + return + } + } - client := manager.NewClient(events) - ss.newClients <- client + ss.accessLogger.Info("gRPC", + zap.Array("request-events", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, e := range request.Events { + enc.AppendString(e.Type) + } + return nil + })), + ) + if request.Events == nil { + ss.errChan <- errors.New("event can't be nil") + return + } + + l := len(request.Events) + events := make([]string, l) + for i := 0; i < l; i++ { + events[i] = request.Events[i].Type + } + ss.resfreshEvents <- refreshEvents{ + client: &client, + events: events, + } + } + }() for { select { + case err := <-ss.errChan: + return err case pl, open := <-client.ReceivePayload(): if !open { return nil diff --git a/server/grpc_test.go b/server/grpc_test.go index 3f7eaff..0a8050e 100644 --- a/server/grpc_test.go +++ b/server/grpc_test.go @@ -128,7 +128,11 @@ func TestGRPCEvents(t *testing.T) { defer conn.Close() client := proto.NewStreamServiceClient(conn) ctx := context.Background() - ss, err := client.Events(ctx, &cases[i].req) + ss, err := client.Events(ctx) + if err := ss.Send(&cases[i].req); err != nil { + require.NoError(err) + } + require.NoError(err) isFirst := true for cases[i].expectCount != cases[i].actualCount {