From b258357c1a3b3e23aa116b5835d0c91301daf073 Mon Sep 17 00:00:00 2001 From: ChristopherHX Date: Thu, 25 Jul 2024 16:23:14 +0200 Subject: [PATCH] Experimental broker phase 1 (#188) * experimental broker phase 1 * fix bug * fix bug and cancellation * update experiment * Send correct runner status * update dev version to 0.8.x * bump workflow * fix contenttype --- .github/workflows/build.yml | 4 +- actionsrunner/runner.go | 10 +++- main.go | 2 +- protocol/connection.go | 2 + protocol/session.go | 56 ++++++++++++++++++- runnerconfiguration/common.go | 15 ++--- .../compat/actions_runner_compat.go | 3 +- 7 files changed, 73 insertions(+), 19 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9931b80..6e68e68 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,8 +15,8 @@ on: type: boolean env: - RUNNER_DEV_VERSION: "0.7.x" - RUNNER_SERVER_VERSION: "3.11.5" + RUNNER_DEV_VERSION: "0.8.x" + RUNNER_SERVER_VERSION: "3.11.14" jobs: build: runs-on: ubuntu-latest diff --git a/actionsrunner/runner.go b/actionsrunner/runner.go index 13cb4c7..787c556 100644 --- a/actionsrunner/runner.go +++ b/actionsrunner/runner.go @@ -280,10 +280,13 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte continue } } + session.Status = "Online" err := vssConnection.RequestWithContext(xctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{ "poolId": fmt.Sprint(instance.PoolID), }, map[string]string{ - "sessionId": session.TaskAgentSession.SessionID, + "sessionId": session.TaskAgentSession.SessionID, + "runnerVersion": "3.0.0", + "status": session.Status, }, nil, message) //TODO lastMessageId= if err != nil { @@ -330,8 +333,8 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte } } } - if success { - if message != nil && (strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest")) { + if success && message.FetchBrokerIfNeeded(xctx, session) == nil { + if strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest") { cancelJobListening() for message != nil && !firstJobReceived && (strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest")) { if run.Once { @@ -346,6 +349,7 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte }() runJob(runnerenv, &joblock, vssConnection, run, cancel, cancelJob, finishJob, jobExecCtx, jobctx, session, *message, instance) { + session.Status = "Busy" var err error message, err = session.GetNextMessage(jobExecCtx) if !errors.Is(err, context.Canceled) && message != nil { diff --git a/main.go b/main.go index 59c3bd2..ac46db1 100644 --- a/main.go +++ b/main.go @@ -162,7 +162,7 @@ func (run *RunRunner) RunWithContext(listenerctx context.Context, ctx context.Co return 0 } -var version string = "0.6.x-dev" +var version string = "0.8.x-dev" type interactive struct { } diff --git a/protocol/connection.go b/protocol/connection.go index 1c09fe0..6d8dc81 100644 --- a/protocol/connection.go +++ b/protocol/connection.go @@ -312,6 +312,7 @@ func (vssConnection *VssConnection) CreateSession(ctx context.Context) (*AgentMe _ = con.Delete(ctx) return nil, err } + con.Status = "Online" return con, nil } @@ -323,6 +324,7 @@ func (vssConnection *VssConnection) LoadSession(ctx context.Context, session *Ta _ = con.Delete(ctx) return nil, err } + con.Status = "Online" return con, nil } diff --git a/protocol/session.go b/protocol/session.go index 8ef6aba..3f5aef1 100644 --- a/protocol/session.go +++ b/protocol/session.go @@ -6,11 +6,13 @@ import ( "crypto/cipher" "crypto/rand" "crypto/rsa" + "strings" // nolint:gosec "crypto/sha1" "crypto/sha256" "encoding/base64" + "encoding/json" "errors" "fmt" "hash" @@ -26,6 +28,9 @@ type TaskAgentMessage struct { } func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) { + if message.IV == "" { + return []byte(message.Body), nil + } iv, err := base64.StdEncoding.DecodeString(message.IV) if err != nil { return nil, err @@ -58,6 +63,49 @@ func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) { return src[off:validlen], nil } +type BrokerMigration struct { + BrokerBaseUrl string `json:"brokerBaseUrl"` +} + +func (message *TaskAgentMessage) FetchBrokerIfNeeded(xctx context.Context, session *AgentMessageConnection) error { + if strings.EqualFold(message.MessageType, "BrokerMigration") { + vssConnection := session.VssConnection + rjrr := &BrokerMigration{} + raw, err := message.Decrypt(session.Block) + if err != nil { + return err + } + err = json.Unmarshal(raw, rjrr) + if err != nil { + return err + } + for retries := 0; retries < 5; retries++ { + copy := *vssConnection + vssConnection := © + vssConnection.TenantURL = rjrr.BrokerBaseUrl + furl, err := vssConnection.BuildURL("message", map[string]string{}, map[string]string{ + "sessionId": session.TaskAgentSession.SessionID, + "runnerVersion": "3.0.0", + "status": session.Status, + }) + if err != nil { + return err + } + err = vssConnection.RequestWithContext2(xctx, "GET", furl, "", nil, &message) + if err == nil || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + return err + } + select { + case <-xctx.Done(): + return xctx.Err() + case <-time.After(time.Second * 5 * time.Duration(retries+1)): + } + } + return err + } + return nil +} + type TaskAgentSessionKey struct { Encrypted bool Value string @@ -96,6 +144,7 @@ type AgentMessageConnection struct { VssConnection *VssConnection TaskAgentSession *TaskAgentSession Block cipher.Block + Status string } func (session *AgentMessageConnection) Delete(ctx context.Context) error { @@ -116,9 +165,14 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas err := session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{ "poolId": fmt.Sprint(session.VssConnection.PoolID), }, map[string]string{ - "sessionId": session.TaskAgentSession.SessionID, + "sessionId": session.TaskAgentSession.SessionID, + "runnerVersion": "3.0.0", }, nil, message) // TODO lastMessageId= + if err == nil { + err = session.DeleteMessage(ctx, message) + err = errors.Join(err, message.FetchBrokerIfNeeded(ctx, session)) + } if err != nil { if errors.Is(err, context.Canceled) { return nil, err diff --git a/runnerconfiguration/common.go b/runnerconfiguration/common.go index efad109..f653b36 100644 --- a/runnerconfiguration/common.go +++ b/runnerconfiguration/common.go @@ -1,13 +1,11 @@ package runnerconfiguration import ( - "bytes" "context" "crypto/rsa" "crypto/tls" "crypto/x509" "encoding/base64" - "encoding/json" "fmt" "net/http" "net/url" @@ -148,14 +146,6 @@ func gitHubAuth(config *ConfigureRemoveRunner, c *http.Client, runnerEvent strin } registerUrl.Path = path.Join(apiscope, "actions/runner-registration") - buf := new(bytes.Buffer) - req := &protocol.RunnerAddRemove{} - req.URL = config.URL - req.RunnerEvent = runnerEvent - enc := json.NewEncoder(buf) - if err := enc.Encode(req); err != nil { - return nil, err - } finalregisterUrl := registerUrl.String() client := &protocol.VssConnection{ @@ -164,7 +154,10 @@ func gitHubAuth(config *ConfigureRemoveRunner, c *http.Client, runnerEvent strin Client: c, } res := &protocol.GitHubAuthResult{} - err = client.RequestWithContext2(context.Background(), "POST", finalregisterUrl, "", buf, res) + err = client.RequestWithContext2(context.Background(), "POST", finalregisterUrl, "", &protocol.RunnerAddRemove{ + URL: config.URL, + RunnerEvent: runnerEvent, + }, res) if err != nil { return nil, fmt.Errorf("failed to authenticate as Runner Admin: %v", err) diff --git a/runnerconfiguration/compat/actions_runner_compat.go b/runnerconfiguration/compat/actions_runner_compat.go index 2862fc0..f380870 100644 --- a/runnerconfiguration/compat/actions_runner_compat.go +++ b/runnerconfiguration/compat/actions_runner_compat.go @@ -153,8 +153,9 @@ func ToRunnerInstance(fileAccess ConfigFileAccess) (*runnerconfiguration.RunnerI ClientID: credentials.Data.ClientId, }, DisableUpdate: disableUpdate, + Version: "3.0.0", }, - WorkFolder: agent.WorkFolder, + WorkFolder: agent.WorkFolder, RegistrationURL: agent.GitHubUrl, }, nil }