From cec3ddca4f98f337b1151e7465f6e0b8e3a7ef6d Mon Sep 17 00:00:00 2001 From: Josef Karasek Date: Wed, 2 Aug 2023 11:18:17 +0200 Subject: [PATCH] Handle server closing grpc stream (#1164) Handle server closing grpc stream --- pkg/api/cloudslack/cloudslack.pb.go | 40 +++++++++++++++----- pkg/api/cloudslack/error.go | 35 +++++++++++++++++ pkg/bot/slack_cloud.go | 58 ++++++++++++++++++++++++++++- pkg/config/config.go | 11 +++--- proto/cloudslack.proto | 3 ++ 5 files changed, 131 insertions(+), 16 deletions(-) create mode 100644 pkg/api/cloudslack/error.go diff --git a/pkg/api/cloudslack/cloudslack.pb.go b/pkg/api/cloudslack/cloudslack.pb.go index 450894de1..90410b5bc 100644 --- a/pkg/api/cloudslack/cloudslack.pb.go +++ b/pkg/api/cloudslack/cloudslack.pb.go @@ -26,6 +26,7 @@ type ConnectRequest struct { unknownFields protoimpl.UnknownFields InstanceId string `protobuf:"bytes,1,opt,name=instanceId,proto3" json:"instanceId,omitempty"` + BotId string `protobuf:"bytes,2,opt,name=botId,proto3" json:"botId,omitempty"` } func (x *ConnectRequest) Reset() { @@ -67,6 +68,13 @@ func (x *ConnectRequest) GetInstanceId() string { return "" } +func (x *ConnectRequest) GetBotId() string { + if x != nil { + return x.BotId + } + return "" +} + type ConnectResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -74,6 +82,8 @@ type ConnectResponse struct { // event is the event received from slack slackevents.EventsAPIEvent Event []byte `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` + // error is the error received from server + Error []byte `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` } func (x *ConnectResponse) Reset() { @@ -115,24 +125,34 @@ func (x *ConnectResponse) GetEvent() []byte { return nil } +func (x *ConnectResponse) GetError() []byte { + if x != nil { + return x.Error + } + return nil +} + var File_cloudslack_proto protoreflect.FileDescriptor var file_cloudslack_proto_rawDesc = []byte{ 0x0a, 0x10, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x22, 0x30, + 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x22, 0x46, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, - 0x22, 0x27, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x32, 0x56, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, - 0x75, 0x64, 0x53, 0x6c, 0x61, 0x63, 0x6b, 0x12, 0x48, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x12, 0x1a, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, - 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, + 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x22, 0x3d, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, + 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x56, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x53, 0x6c, + 0x61, 0x63, 0x6b, 0x12, 0x48, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x1a, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, - 0x01, 0x42, 0x14, 0x5a, 0x12, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x6f, - 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x14, 0x5a, + 0x12, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, + 0x61, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/api/cloudslack/error.go b/pkg/api/cloudslack/error.go new file mode 100644 index 000000000..dc48e2a21 --- /dev/null +++ b/pkg/api/cloudslack/error.go @@ -0,0 +1,35 @@ +package cloudslack + +type CloudSlackErrorType string + +const ( + CloudSlackErrorQuotaExceeded CloudSlackErrorType = "QuotaExceeded" +) + +type CloudSlackError struct { + Msg string `json:"message"` + ErrType CloudSlackErrorType `json:"type"` +} + +func NewQuotaExceededError(msg string) *CloudSlackError { + return &CloudSlackError{ + Msg: msg, + ErrType: CloudSlackErrorQuotaExceeded, + } +} + +func (e *CloudSlackError) Error() string { + return e.Msg +} + +func (e *CloudSlackError) IsQuotaExceeded() bool { + return e.ErrType == CloudSlackErrorQuotaExceeded +} + +func IsQuotaExceededErr(err error) bool { + if err == nil { + return false + } + e, ok := err.(*CloudSlackError) + return ok && e.IsQuotaExceeded() +} diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 00429f42b..30e4f6022 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "regexp" "strings" "sync" @@ -39,6 +40,7 @@ const ( retryDelay = time.Second maxRetries = 30 successIntervalDuration = 3 * time.Minute + quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..." ) var _ Bot = &CloudSlack{} @@ -109,6 +111,10 @@ func NewCloudSlack(log logrus.FieldLogger, } func (b *CloudSlack) Start(ctx context.Context) error { + if b.cfg.ExecutionEventStreamingDisabled { + b.log.Warn(quotaExceededMsg) + return nil + } return withRetries(ctx, b.log, maxRetries, func() error { return b.start(ctx) }) @@ -168,6 +174,7 @@ func (b *CloudSlack) start(ctx context.Context) error { req := &pb.ConnectRequest{ InstanceId: remoteConfig.Identifier, + BotId: b.botID, } c, err := pb.NewCloudSlackClient(conn).Connect(ctx) if err != nil { @@ -188,14 +195,24 @@ func (b *CloudSlack) start(ctx context.Context) error { for { data, err := c.Recv() if err != nil { + if err == io.EOF { + b.log.Warn("gRPC connection was closed by server") + return nil + } errStatus, ok := status.FromError(err) if ok && errStatus.Code() == codes.Canceled && errStatus.Message() == context.Canceled.Error() { b.log.Debugf("Context was cancelled. Skipping returning error...") return nil } - return fmt.Errorf("while receiving cloud slack events: %w", err) } + if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) { + b.log.Warn(quotaExceededMsg) + return nil + } + if len(data.Event) == 0 { + continue + } event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken()) if err != nil { return fmt.Errorf("while parsing event: %w", err) @@ -221,6 +238,19 @@ func (b *CloudSlack) start(ctx context.Context) error { if err := b.handleMessage(ctx, msg); err != nil { b.log.Errorf("while handling message: %s", err.Error()) } + case *slackevents.MessageEvent: + b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent)) + msg := slackMessage{ + Text: ev.Text, + Channel: ev.Channel, + UserID: ev.User, + EventTimeStamp: ev.EventTimeStamp, + } + response := quotaExceeded() + + if err := b.send(ctx, msg, response); err != nil { + return fmt.Errorf("while sending message: %w", err) + } } case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission): var callback slack.InteractionCallback @@ -629,3 +659,29 @@ func (b *CloudSlack) addUnaryClientCredentials() grpc.UnaryClientInterceptor { return invoker(ctx, method, req, reply, cc, opts...) } } + +func (b *CloudSlack) checkStreamingError(data []byte) error { + if len(data) == 0 { + return nil + } + cloudSlackErr := &pb.CloudSlackError{} + if err := json.Unmarshal(data, cloudSlackErr); err != nil { + return fmt.Errorf("while unmarshaling error: %w", err) + } + return cloudSlackErr +} + +func quotaExceeded() interactive.CoreMessage { + return interactive.CoreMessage{ + Header: "Quota exceeded", + Message: api.Message{ + Sections: []api.Section{ + { + Base: api.Base{ + Description: "You cannot use the Botkube Cloud Slack application within your plan. The command executions are blocked.", + }, + }, + }, + }, + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 74b5e9659..2a518c38a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -412,11 +412,12 @@ type SocketSlack struct { // CloudSlack configuration for multi-slack support type CloudSlack struct { - Enabled bool `yaml:"enabled"` - Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required_if=Enabled true,dive,omitempty,min=1"` - Token string `yaml:"token"` - BotID string `yaml:"botID,omitempty"` - Server GRPCServer `yaml:"server"` + Enabled bool `yaml:"enabled"` + Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required_if=Enabled true,dive,omitempty,min=1"` + Token string `yaml:"token"` + BotID string `yaml:"botID,omitempty"` + Server GRPCServer `yaml:"server"` + ExecutionEventStreamingDisabled bool `yaml:"executionEventStreamingDisabled"` } // GRPCServer config for gRPC server diff --git a/proto/cloudslack.proto b/proto/cloudslack.proto index 174acdb18..829b8f9c5 100644 --- a/proto/cloudslack.proto +++ b/proto/cloudslack.proto @@ -6,11 +6,14 @@ package cloudslack; message ConnectRequest { string instanceId = 1; + string botId = 2; } message ConnectResponse { // event is the event received from slack slackevents.EventsAPIEvent bytes event = 1; + // error is the error received from server + bytes error = 2; } service CloudSlack {