Skip to content

Commit

Permalink
Handle server closing grpc stream (#1164)
Browse files Browse the repository at this point in the history
Handle server closing grpc stream
  • Loading branch information
Josef Karasek authored Aug 2, 2023
1 parent b8e2b72 commit cec3ddc
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 16 deletions.
40 changes: 30 additions & 10 deletions pkg/api/cloudslack/cloudslack.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions pkg/api/cloudslack/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cloudslack

type CloudSlackErrorType string

const (
CloudSlackErrorQuotaExceeded CloudSlackErrorType = "QuotaExceeded"
)

type CloudSlackError struct {
Msg string `json:"message"`
ErrType CloudSlackErrorType `json:"type"`
}

func NewQuotaExceededError(msg string) *CloudSlackError {
return &CloudSlackError{
Msg: msg,
ErrType: CloudSlackErrorQuotaExceeded,
}
}

func (e *CloudSlackError) Error() string {
return e.Msg
}

func (e *CloudSlackError) IsQuotaExceeded() bool {
return e.ErrType == CloudSlackErrorQuotaExceeded
}

func IsQuotaExceededErr(err error) bool {
if err == nil {
return false
}
e, ok := err.(*CloudSlackError)
return ok && e.IsQuotaExceeded()
}
58 changes: 57 additions & 1 deletion pkg/bot/slack_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -39,6 +40,7 @@ const (
retryDelay = time.Second
maxRetries = 30
successIntervalDuration = 3 * time.Minute
quotaExceededMsg = "Quota exceeded detected. Stopping reconnecting to Botkube Cloud gRPC API..."
)

var _ Bot = &CloudSlack{}
Expand Down Expand Up @@ -109,6 +111,10 @@ func NewCloudSlack(log logrus.FieldLogger,
}

func (b *CloudSlack) Start(ctx context.Context) error {
if b.cfg.ExecutionEventStreamingDisabled {
b.log.Warn(quotaExceededMsg)
return nil
}
return withRetries(ctx, b.log, maxRetries, func() error {
return b.start(ctx)
})
Expand Down Expand Up @@ -168,6 +174,7 @@ func (b *CloudSlack) start(ctx context.Context) error {

req := &pb.ConnectRequest{
InstanceId: remoteConfig.Identifier,
BotId: b.botID,
}
c, err := pb.NewCloudSlackClient(conn).Connect(ctx)
if err != nil {
Expand All @@ -188,14 +195,24 @@ func (b *CloudSlack) start(ctx context.Context) error {
for {
data, err := c.Recv()
if err != nil {
if err == io.EOF {
b.log.Warn("gRPC connection was closed by server")
return nil
}
errStatus, ok := status.FromError(err)
if ok && errStatus.Code() == codes.Canceled && errStatus.Message() == context.Canceled.Error() {
b.log.Debugf("Context was cancelled. Skipping returning error...")
return nil
}

return fmt.Errorf("while receiving cloud slack events: %w", err)
}
if streamingError := b.checkStreamingError(data.Event); pb.IsQuotaExceededErr(streamingError) {
b.log.Warn(quotaExceededMsg)
return nil
}
if len(data.Event) == 0 {
continue
}
event, err := slackevents.ParseEvent(data.Event, slackevents.OptionNoVerifyToken())
if err != nil {
return fmt.Errorf("while parsing event: %w", err)
Expand All @@ -221,6 +238,19 @@ func (b *CloudSlack) start(ctx context.Context) error {
if err := b.handleMessage(ctx, msg); err != nil {
b.log.Errorf("while handling message: %s", err.Error())
}
case *slackevents.MessageEvent:
b.log.Debugf("Got generic message event %s", formatx.StructDumper().Sdump(innerEvent))
msg := slackMessage{
Text: ev.Text,
Channel: ev.Channel,
UserID: ev.User,
EventTimeStamp: ev.EventTimeStamp,
}
response := quotaExceeded()

if err := b.send(ctx, msg, response); err != nil {
return fmt.Errorf("while sending message: %w", err)
}
}
case string(slack.InteractionTypeBlockActions), string(slack.InteractionTypeViewSubmission):
var callback slack.InteractionCallback
Expand Down Expand Up @@ -629,3 +659,29 @@ func (b *CloudSlack) addUnaryClientCredentials() grpc.UnaryClientInterceptor {
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func (b *CloudSlack) checkStreamingError(data []byte) error {
if len(data) == 0 {
return nil
}
cloudSlackErr := &pb.CloudSlackError{}
if err := json.Unmarshal(data, cloudSlackErr); err != nil {
return fmt.Errorf("while unmarshaling error: %w", err)
}
return cloudSlackErr
}

func quotaExceeded() interactive.CoreMessage {
return interactive.CoreMessage{
Header: "Quota exceeded",
Message: api.Message{
Sections: []api.Section{
{
Base: api.Base{
Description: "You cannot use the Botkube Cloud Slack application within your plan. The command executions are blocked.",
},
},
},
},
}
}
11 changes: 6 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,12 @@ type SocketSlack struct {

// CloudSlack configuration for multi-slack support
type CloudSlack struct {
Enabled bool `yaml:"enabled"`
Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required_if=Enabled true,dive,omitempty,min=1"`
Token string `yaml:"token"`
BotID string `yaml:"botID,omitempty"`
Server GRPCServer `yaml:"server"`
Enabled bool `yaml:"enabled"`
Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required_if=Enabled true,dive,omitempty,min=1"`
Token string `yaml:"token"`
BotID string `yaml:"botID,omitempty"`
Server GRPCServer `yaml:"server"`
ExecutionEventStreamingDisabled bool `yaml:"executionEventStreamingDisabled"`
}

// GRPCServer config for gRPC server
Expand Down
3 changes: 3 additions & 0 deletions proto/cloudslack.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package cloudslack;

message ConnectRequest {
string instanceId = 1;
string botId = 2;
}

message ConnectResponse {
// event is the event received from slack slackevents.EventsAPIEvent
bytes event = 1;
// error is the error received from server
bytes error = 2;
}

service CloudSlack {
Expand Down

0 comments on commit cec3ddc

Please sign in to comment.