Skip to content

Commit

Permalink
Experimental broker phase 1 (#188)
Browse files Browse the repository at this point in the history
* experimental broker phase 1

* fix bug

* fix bug and cancellation

* update experiment

* Send correct runner status

* update dev version to 0.8.x

* bump workflow

* fix contenttype
  • Loading branch information
ChristopherHX authored Jul 25, 2024
1 parent 37a7677 commit b258357
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ on:
type: boolean

env:
RUNNER_DEV_VERSION: "0.7.x"
RUNNER_SERVER_VERSION: "3.11.5"
RUNNER_DEV_VERSION: "0.8.x"
RUNNER_SERVER_VERSION: "3.11.14"
jobs:
build:
runs-on: ubuntu-latest
Expand Down
10 changes: 7 additions & 3 deletions actionsrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,13 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte
continue
}
}
session.Status = "Online"
err := vssConnection.RequestWithContext(xctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{
"poolId": fmt.Sprint(instance.PoolID),
}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
"sessionId": session.TaskAgentSession.SessionID,
"runnerVersion": "3.0.0",
"status": session.Status,
}, nil, message)
//TODO lastMessageId=
if err != nil {
Expand Down Expand Up @@ -330,8 +333,8 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte
}
}
}
if success {
if message != nil && (strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest")) {
if success && message.FetchBrokerIfNeeded(xctx, session) == nil {
if strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest") {
cancelJobListening()
for message != nil && !firstJobReceived && (strings.EqualFold(message.MessageType, "PipelineAgentJobRequest") || strings.EqualFold(message.MessageType, "RunnerJobRequest")) {
if run.Once {
Expand All @@ -346,6 +349,7 @@ func (run *RunRunner) Run(runnerenv RunnerEnvironment, listenerctx context.Conte
}()
runJob(runnerenv, &joblock, vssConnection, run, cancel, cancelJob, finishJob, jobExecCtx, jobctx, session, *message, instance)
{
session.Status = "Busy"
var err error
message, err = session.GetNextMessage(jobExecCtx)
if !errors.Is(err, context.Canceled) && message != nil {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (run *RunRunner) RunWithContext(listenerctx context.Context, ctx context.Co
return 0
}

var version string = "0.6.x-dev"
var version string = "0.8.x-dev"

type interactive struct {
}
Expand Down
2 changes: 2 additions & 0 deletions protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ func (vssConnection *VssConnection) CreateSession(ctx context.Context) (*AgentMe
_ = con.Delete(ctx)
return nil, err
}
con.Status = "Online"
return con, nil
}

Expand All @@ -323,6 +324,7 @@ func (vssConnection *VssConnection) LoadSession(ctx context.Context, session *Ta
_ = con.Delete(ctx)
return nil, err
}
con.Status = "Online"
return con, nil
}

Expand Down
56 changes: 55 additions & 1 deletion protocol/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"crypto/cipher"
"crypto/rand"
"crypto/rsa"
"strings"

// nolint:gosec
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"hash"
Expand All @@ -26,6 +28,9 @@ type TaskAgentMessage struct {
}

func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) {
if message.IV == "" {
return []byte(message.Body), nil
}
iv, err := base64.StdEncoding.DecodeString(message.IV)
if err != nil {
return nil, err
Expand Down Expand Up @@ -58,6 +63,49 @@ func (message *TaskAgentMessage) Decrypt(block cipher.Block) ([]byte, error) {
return src[off:validlen], nil
}

type BrokerMigration struct {
BrokerBaseUrl string `json:"brokerBaseUrl"`
}

func (message *TaskAgentMessage) FetchBrokerIfNeeded(xctx context.Context, session *AgentMessageConnection) error {
if strings.EqualFold(message.MessageType, "BrokerMigration") {
vssConnection := session.VssConnection
rjrr := &BrokerMigration{}
raw, err := message.Decrypt(session.Block)
if err != nil {
return err
}
err = json.Unmarshal(raw, rjrr)
if err != nil {
return err
}
for retries := 0; retries < 5; retries++ {
copy := *vssConnection
vssConnection := &copy
vssConnection.TenantURL = rjrr.BrokerBaseUrl
furl, err := vssConnection.BuildURL("message", map[string]string{}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
"runnerVersion": "3.0.0",
"status": session.Status,
})
if err != nil {
return err
}
err = vssConnection.RequestWithContext2(xctx, "GET", furl, "", nil, &message)
if err == nil || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return err
}
select {
case <-xctx.Done():
return xctx.Err()
case <-time.After(time.Second * 5 * time.Duration(retries+1)):
}
}
return err
}
return nil
}

type TaskAgentSessionKey struct {
Encrypted bool
Value string
Expand Down Expand Up @@ -96,6 +144,7 @@ type AgentMessageConnection struct {
VssConnection *VssConnection
TaskAgentSession *TaskAgentSession
Block cipher.Block
Status string
}

func (session *AgentMessageConnection) Delete(ctx context.Context) error {
Expand All @@ -116,9 +165,14 @@ func (session *AgentMessageConnection) GetNextMessage(ctx context.Context) (*Tas
err := session.VssConnection.RequestWithContext(ctx, "c3a054f6-7a8a-49c0-944e-3a8e5d7adfd7", "5.1-preview", "GET", map[string]string{
"poolId": fmt.Sprint(session.VssConnection.PoolID),
}, map[string]string{
"sessionId": session.TaskAgentSession.SessionID,
"sessionId": session.TaskAgentSession.SessionID,
"runnerVersion": "3.0.0",
}, nil, message)
// TODO lastMessageId=
if err == nil {
err = session.DeleteMessage(ctx, message)
err = errors.Join(err, message.FetchBrokerIfNeeded(ctx, session))
}
if err != nil {
if errors.Is(err, context.Canceled) {
return nil, err
Expand Down
15 changes: 4 additions & 11 deletions runnerconfiguration/common.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package runnerconfiguration

import (
"bytes"
"context"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -148,14 +146,6 @@ func gitHubAuth(config *ConfigureRemoveRunner, c *http.Client, runnerEvent strin
}
registerUrl.Path = path.Join(apiscope, "actions/runner-registration")

buf := new(bytes.Buffer)
req := &protocol.RunnerAddRemove{}
req.URL = config.URL
req.RunnerEvent = runnerEvent
enc := json.NewEncoder(buf)
if err := enc.Encode(req); err != nil {
return nil, err
}
finalregisterUrl := registerUrl.String()

client := &protocol.VssConnection{
Expand All @@ -164,7 +154,10 @@ func gitHubAuth(config *ConfigureRemoveRunner, c *http.Client, runnerEvent strin
Client: c,
}
res := &protocol.GitHubAuthResult{}
err = client.RequestWithContext2(context.Background(), "POST", finalregisterUrl, "", buf, res)
err = client.RequestWithContext2(context.Background(), "POST", finalregisterUrl, "", &protocol.RunnerAddRemove{
URL: config.URL,
RunnerEvent: runnerEvent,
}, res)

if err != nil {
return nil, fmt.Errorf("failed to authenticate as Runner Admin: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion runnerconfiguration/compat/actions_runner_compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ func ToRunnerInstance(fileAccess ConfigFileAccess) (*runnerconfiguration.RunnerI
ClientID: credentials.Data.ClientId,
},
DisableUpdate: disableUpdate,
Version: "3.0.0",
},
WorkFolder: agent.WorkFolder,
WorkFolder: agent.WorkFolder,
RegistrationURL: agent.GitHubUrl,
}, nil
}
Expand Down

0 comments on commit b258357

Please sign in to comment.