Skip to content

Commit

Permalink
Update executor contract, publish local dev server
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Feb 19, 2024
1 parent ea1de35 commit 6e5b2e3
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 206 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ For faster development, you can also build and run Botkube outside K8s cluster.
1. Start fake plugins server to serve binaries from [`dist`](dist) folder:
```bash
go run test/helpers/plugin_server.go
go run hack/target/serve-plugins/main.go
```
> **Note**
Expand Down
45 changes: 45 additions & 0 deletions hack/target/serve-plugins/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"flag"
"log"
"os"
"path/filepath"
"strconv"

"github.com/kubeshop/botkube/pkg/loggerx"
"github.com/kubeshop/botkube/pkg/pluginx"
)

func main() {
pluginsDir := flag.String("plugins-dir", getEnv("PLUGINS_DIR", "plugin-dist"), "Plugins directory")
host := flag.String("host", getEnv("PLUGIN_SERVER_HOST", "http://localhost"), "Local server host")
port := flag.String("port", getEnv("PLUGIN_SERVER_PORT", "3010"), "Local server port")
flag.Parse()

dir, err := os.Getwd()
loggerx.ExitOnError(err, "while getting current directory")

portInt, err := strconv.Atoi(*port)
loggerx.ExitOnError(err, "while casting server port value")

binDir := filepath.Join(dir, *pluginsDir)
indexEndpoint, startServerFn := pluginx.NewStaticPluginServer(pluginx.StaticPluginServerConfig{
BinariesDirectory: binDir,
Host: *host,
Port: portInt,
})

log.Printf("Service plugin binaries from %s\n", binDir)
log.Printf("Botkube repository index URL: %s", indexEndpoint)
err = startServerFn()
loggerx.ExitOnError(err, "while starting server")
}

func getEnv(key, defaultValue string) string {
value, exists := os.LookupEnv(key)
if !exists {
return defaultValue
}
return value
}
360 changes: 224 additions & 136 deletions pkg/api/executor/executor.pb.go

Large diffs are not rendered by default.

31 changes: 25 additions & 6 deletions pkg/api/executor/grpc_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,24 @@ type (
// Limitations:
// - It's available only for SocketSlack. In the future, it may be adopted across other platforms.
Message Message

IncomingWebhook IncomingWebhookDetailsContext
}

// IncomingWebhookDetailsContext holds source incoming webhook context.
IncomingWebhookDetailsContext struct {
BaseSourceURL string
}

// Message holds information about the message that triggered a given Executor.
Message struct {
Text string
URL string
User User

// ParentActivityID is the ID of the parent activity. If user follows with messages in a thread, this ID represents the originating message that started that thread.
// Otherwise, it's the ID of the initial message.
ParentActivityID string
}

// User represents the user that sent a message.
Expand Down Expand Up @@ -93,7 +104,7 @@ type (
//
// NOTE: In the future we can consider using VersionedPlugins. These can be used to negotiate
// a compatible version between client and server. If this is set, Handshake.ProtocolVersion is not required.
const ProtocolVersion = 2
const ProtocolVersion = 3

var _ plugin.GRPCPlugin = &Plugin{}

Expand Down Expand Up @@ -133,13 +144,17 @@ func (p *grpcClient) Execute(ctx context.Context, in ExecuteInput) (ExecuteOutpu
IsInteractivitySupported: in.Context.IsInteractivitySupported,
KubeConfig: in.Context.KubeConfig,
Message: &MessageContext{
Text: in.Context.Message.Text,
Url: in.Context.Message.URL,
Text: in.Context.Message.Text,
Url: in.Context.Message.URL,
ParentActivityId: in.Context.Message.ParentActivityID,
User: &UserContext{
Mention: in.Context.Message.User.Mention,
DisplayName: in.Context.Message.User.DisplayName,
},
},
IncomingWebhook: &IncomingWebhookContext{
BaseSourceURL: in.Context.IncomingWebhook.BaseSourceURL,
},
},
}

Expand Down Expand Up @@ -241,6 +256,9 @@ func (p *grpcServer) Execute(ctx context.Context, request *ExecuteRequest) (*Exe
IsInteractivitySupported: request.Context.IsInteractivitySupported,
KubeConfig: request.Context.KubeConfig,
Message: p.toMessageIfPresent(request.Context.Message),
IncomingWebhook: IncomingWebhookDetailsContext{
BaseSourceURL: request.Context.IncomingWebhook.BaseSourceURL,
},
},
})
if err != nil {
Expand Down Expand Up @@ -281,9 +299,10 @@ func (*grpcServer) toMessageIfPresent(msg *MessageContext) Message {
}

return Message{
Text: msg.Text,
URL: msg.Url,
User: user,
Text: msg.Text,
URL: msg.Url,
ParentActivityID: msg.ParentActivityId,
User: user,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/api/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Message struct {
OnlyVisibleForYou bool `json:"onlyVisibleForYou,omitempty" yaml:"onlyVisibleForYou"`
ReplaceOriginal bool `json:"replaceOriginal,omitempty" yaml:"replaceOriginal"`
UserHandle string `json:"userHandle,omitempty" yaml:"userHandle"`

// ParentActivityID represents the originating message that started a thread. If set, message will be sent in that thread instead of the default one.
ParentActivityID string `json:"parentActivityId" yaml:"parentActivityId"`
}

func (msg *Message) IsEmpty() bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/source/grpc_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type (
//
// NOTE: In the future we can consider using VersionedPlugins. These can be used to negotiate
// a compatible version between client and server. If this is set, Handshake.ProtocolVersion is not required.
const ProtocolVersion = 2
const ProtocolVersion = 3

var _ plugin.GRPCPlugin = &Plugin{}

Expand Down
6 changes: 6 additions & 0 deletions pkg/bot/slack_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) err
SlackState: event.State,
URL: permalink,
Text: event.Text,
ParentActivityID: event.GetTimestamp(),
},
Message: request,
User: execute.UserInput{
Expand Down Expand Up @@ -568,6 +569,11 @@ func (b *SocketSlack) send(ctx context.Context, event slackMessage, in interacti
if resp.Message.UserHandle != "" {
id = resp.Message.UserHandle
}

if resp.Message.ParentActivityID != "" {
options = append(options, slack.MsgOptionTS(resp.Message.ParentActivityID))
}

_, _, err = b.client.PostMessageContext(ctx, id, options...)
if err != nil {
return fmt.Errorf("while posting Slack message: %w", slackError(err, event.Channel))
Expand Down
1 change: 1 addition & 0 deletions pkg/execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type Conversation struct {
SlackState *slack.BlockActionStates
URL string
Text string
ParentActivityID string
}

// NewDefaultInput an input for NewDefault
Expand Down
4 changes: 4 additions & 0 deletions pkg/execute/plugin_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (e *PluginExecutor) Execute(ctx context.Context, bindings []string, slackSt
Mention: cmdCtx.User.Mention,
DisplayName: cmdCtx.User.DisplayName,
},
ParentActivityID: cmdCtx.Conversation.ParentActivityID,
},
IncomingWebhook: executor.IncomingWebhookDetailsContext{
BaseSourceURL: e.cfg.Plugins.IncomingWebhook.InClusterBaseURL + "/sources/v1",
},
},
})
Expand Down
26 changes: 10 additions & 16 deletions test/fake/plugin_server.go → pkg/plugin/plugin_server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package fake
package plugin

import (
"fmt"
"github.com/kubeshop/botkube/pkg/plugin"
"log"
"net/http"
"os"
Expand All @@ -16,26 +15,21 @@ import (
const indexFileEndpoint = "/botkube.yaml"

type (
// PluginConfig holds configuration for fake plugin server.
PluginConfig struct {
// StaticPluginServerConfig holds configuration for fake plugin server.
StaticPluginServerConfig struct {
BinariesDirectory string
Server PluginServer
}

// PluginServer holds configuration for HTTP plugin server.
PluginServer struct {
Host string `envconfig:"default=http://host.k3d.internal"`
Port int `envconfig:"default=3000"`
Host string `envconfig:"default=http://host.k3d.internal"`
Port int `envconfig:"default=3000"`
}
)

// NewPluginServer return function to start the fake plugin HTTP server.
func NewPluginServer(cfg PluginConfig) (string, func() error) {
// NewStaticPluginServer return function to start the static plugin HTTP server suitable for local development or e2e tests.
func NewStaticPluginServer(cfg StaticPluginServerConfig) (string, func() error) {
fs := http.FileServer(http.Dir(cfg.BinariesDirectory))
http.Handle("/static/", http.StripPrefix("/static/", fs))

basePath := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
builder := plugin.NewIndexBuilder(loggerx.NewNoop())
basePath := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
builder := NewIndexBuilder(loggerx.NewNoop())

http.HandleFunc(indexFileEndpoint, func(w http.ResponseWriter, _ *http.Request) {
isArchive := os.Getenv("OUTPUT_MODE") == "archive"
Expand All @@ -57,7 +51,7 @@ func NewPluginServer(cfg PluginConfig) (string, func() error) {
}
})

addr := fmt.Sprintf(":%d", cfg.Server.Port)
addr := fmt.Sprintf(":%d", cfg.Port)
log.Printf("Listening on %s...", addr)

server := &http.Server{
Expand Down
8 changes: 7 additions & 1 deletion proto/executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ message ExecuteContext {
bytes slackState = 2;
bytes kubeConfig = 3;
MessageContext message = 4;
IncomingWebhookContext incomingWebhook = 5;
}

message IncomingWebhookContext {
string baseSourceURL = 1;
}

message MessageContext {
string text = 1;
string url = 2;
UserContext user = 3;
string parentActivityId = 3;
UserContext user = 4;
}

message UserContext {
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/bots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/kubeshop/botkube/pkg/pluginx"
"net/http"
"regexp"
"strconv"
Expand All @@ -17,7 +18,6 @@ import (
"botkube.io/botube/test/botkubex"
"botkube.io/botube/test/commplatform"
"botkube.io/botube/test/diff"
"botkube.io/botube/test/fake"
"github.com/MakeNowJust/heredoc"
"github.com/anthhub/forwarder"
"github.com/hasura/go-graphql-client"
Expand Down Expand Up @@ -87,7 +87,7 @@ type Config struct {
Port int `envconfig:"default=2115"`
LocalPort int `envconfig:"default=2115"`
}
Plugins fake.PluginConfig
Plugins pluginx.StaticPluginServerConfig
ConfigMap struct {
Namespace string `envconfig:"default=botkube"`
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func runBotTest(t *testing.T,
var indexEndpoint string
if botDriver.Type() == commplatform.DiscordBot {
t.Log("Starting plugin server...")
endpoint, startServerFn := fake.NewPluginServer(appCfg.Plugins)
endpoint, startServerFn := pluginx.NewStaticPluginServer(appCfg.Plugins)
indexEndpoint = endpoint
go func() {
require.NoError(t, startServerFn())
Expand Down
42 changes: 0 additions & 42 deletions test/helpers/plugin_server.go

This file was deleted.

0 comments on commit 6e5b2e3

Please sign in to comment.