Skip to content

Commit

Permalink
Release v0.0.73 (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubno authored Mar 31, 2024
2 parents 1feaffd + 94ccdb4 commit 8109336
Show file tree
Hide file tree
Showing 25 changed files with 562 additions and 400 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.72
0.0.73
28 changes: 27 additions & 1 deletion packages/api/internal/nomad/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"context"
"fmt"
"log"
"os"
"time"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/hashicorp/nomad/api"
)

const streamRetryTime = 10 * time.Millisecond

var (
nomadAddress = os.Getenv("NOMAD_ADDRESS")
nomadToken = os.Getenv("NOMAD_TOKEN")
Expand Down Expand Up @@ -43,7 +46,30 @@ func InitNomadClient(logger *zap.SugaredLogger) *NomadClient {
cancel: cancel,
}

go n.ListenToJobs(ctx)
index, err := n.GetStartingIndex(ctx)
if err != nil {
log.Fatal(err)
}

go func() {
for {
select {
case <-ctx.Done():
return
default:
listenErr := n.ListenToJobs(ctx, index)
if listenErr != nil {
fmt.Fprintf(os.Stderr, "Error listening to Nomad jobs\n> %v\n", listenErr)

time.Sleep(streamRetryTime)

continue
}

return
}
}
}()

return n
}
Expand Down
107 changes: 57 additions & 50 deletions packages/api/internal/nomad/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package nomad
import (
"context"
"fmt"
"strings"
"time"
"os"

"github.com/e2b-dev/infra/packages/api/internal/utils"

Expand All @@ -21,13 +20,11 @@ const (
jobRunningStatus = "running"

defaultTaskName = "start"

jobCheckInterval = 100 * time.Millisecond
)

type jobSubscriber struct {
subscribers *utils.Map[*jobSubscriber]
wait chan api.AllocationListStub
wait chan api.Allocation
jobID string
taskState string
taskName string
Expand All @@ -37,69 +34,79 @@ func (s *jobSubscriber) close() {
s.subscribers.Remove(s.jobID)
}

func (n *NomadClient) newSubscriber(jobID, taskState, taskName string) *jobSubscriber {
sub := &jobSubscriber{
jobID: jobID,
// We add arbitrary buffer to the channel to avoid blocking the Nomad ListenToJobs goroutine
wait: make(chan api.AllocationListStub, 10),
taskState: taskState,
subscribers: n.subscribers,
taskName: taskName,
func (n *NomadClient) GetStartingIndex(ctx context.Context) (uint64, error) {
_, meta, err := n.client.Jobs().List(nil)
if err != nil {
return 0, fmt.Errorf("failed to get Nomad jobs: %w", err)
}

n.subscribers.Insert(jobID, sub)
if meta.LastIndex == 0 {
return 0, nil
}

return sub
return meta.LastIndex - 1, nil
}

func (n *NomadClient) ListenToJobs(ctx context.Context) {
ticker := time.NewTicker(jobCheckInterval)
defer ticker.Stop()
func (n *NomadClient) ListenToJobs(ctx context.Context, index uint64) error {
topics := map[api.Topic][]string{
api.TopicAllocation: {"*"},
}

streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()

eventCh, err := n.client.EventStream().Stream(streamCtx, topics, index, &api.QueryOptions{
Filter: fmt.Sprintf("JobID contains \"%s\"", instanceJobNameWithSlash),
AllowStale: true,
Prefix: instanceJobName,
})
if err != nil {
return fmt.Errorf("failed to get Nomad event stream: %w", err)
}

for {
select {
// Loop with a ticker work differently than a loop with sleep.
// The ticker will tick every 100ms, but if the loop takes more than 100ms to run, the ticker will tick again immediately.
case <-ticker.C:
subscribers := n.subscribers.Items()

if len(subscribers) == 0 {
continue
case <-ctx.Done():
return nil
case event := <-eventCh:
if event.Err != nil {
return fmt.Errorf("error from event stream: %w", event.Err)
}

filterParts := make([]string, len(subscribers))

var i int

for jobID := range subscribers {
filterParts[i] = jobID
i++
if event.IsHeartbeat() {
continue
}

filterString := strings.Join(filterParts, "|")
for _, e := range event.Events {
alloc, allocErr := e.Allocation()
if allocErr != nil {
errMsg := fmt.Errorf("cannot retrieve allocations for '%s' job: %w", alloc.JobID, allocErr)
fmt.Fprint(os.Stderr, errMsg.Error())

allocs, _, err := n.client.Allocations().List(&api.QueryOptions{
Filter: fmt.Sprintf("JobID matches \"%s\"", filterString),
})
if err != nil {
n.logger.Errorf("Error getting jobs: %v", err)
continue
}

return
n.processAlloc(alloc)
}
}
}
}

for _, alloc := range allocs {
n.processAllocs(alloc)
}
func (n *NomadClient) newSubscriber(jobID, taskState, taskName string) *jobSubscriber {
sub := &jobSubscriber{
jobID: jobID,
wait: make(chan api.Allocation),
taskState: taskState,
subscribers: n.subscribers,
taskName: taskName,
}

case <-ctx.Done():
fmt.Println("Context canceled, stopping ListenToJobs")
n.subscribers.Insert(jobID, sub)

return
}
}
return sub
}

func (n *NomadClient) processAllocs(alloc *api.AllocationListStub) {
func (n *NomadClient) processAlloc(alloc *api.Allocation) {
sub, ok := n.subscribers.Get(alloc.JobID)

if !ok {
Expand All @@ -116,7 +123,7 @@ func (n *NomadClient) processAllocs(alloc *api.AllocationListStub) {
select {
case sub.wait <- *alloc:
default:
n.logger.Errorf("channel for job %s is full", alloc.JobID)
n.logger.Warnf("channel for job %s is full", alloc.JobID)
}
}

Expand All @@ -135,7 +142,7 @@ func (n *NomadClient) processAllocs(alloc *api.AllocationListStub) {
select {
case sub.wait <- *alloc:
default:
n.logger.Errorf("channel for job %s is full", alloc.JobID)
n.logger.Warnf("channel for job %s is full", alloc.JobID)
}
}
}
6 changes: 3 additions & 3 deletions packages/cluster/client/main.tf
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
resource "google_compute_health_check" "nomad_check" {
name = "${var.cluster_name}-nomad-client-check"
check_interval_sec = 5
timeout_sec = 5
check_interval_sec = 15
timeout_sec = 10
healthy_threshold = 2
unhealthy_threshold = 10 # 50 seconds

http_health_check {
request_path = "/v1/status/peers"
request_path = "/v1/agent/health"
port = "4646"
}
}
Expand Down
5 changes: 0 additions & 5 deletions packages/cluster/network/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,6 @@ resource "google_compute_url_map" "orch_map" {
}

### IPv4 block ###
resource "google_compute_target_http_proxy" "default" {
name = "${var.prefix}http-proxy"
url_map = google_compute_url_map.orch_map.self_link
}

resource "google_compute_target_https_proxy" "default" {
name = "${var.prefix}https-proxy"
url_map = google_compute_url_map.orch_map.self_link
Expand Down
2 changes: 1 addition & 1 deletion packages/cluster/server/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resource "google_compute_health_check" "nomad_check" {
unhealthy_threshold = 10 # 50 seconds

http_health_check {
request_path = "/v1/status/peers"
request_path = "/v1/agent/health"
port = "4646"
}
}
Expand Down
7 changes: 1 addition & 6 deletions packages/env-build-task-driver/internal/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,5 @@ func (h *extraTaskHandle) Run(ctx context.Context, tracer trace.Tracer) error {

func (h *extraTaskHandle) Stats(ctx context.Context, statsChannel chan *drivers.TaskResourceUsage, interval time.Duration) {
defer close(statsChannel)
for {
select {
case <-ctx.Done():
return
}
}
<-ctx.Done()
}
76 changes: 47 additions & 29 deletions packages/env-build-task-driver/internal/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,17 @@ type (
}
)

func (de *DriverExtra) StartTask(cfg *drivers.TaskConfig,
driverCtx context.Context, tracer trace.Tracer, tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]], logger hclog.Logger,
func (de *DriverExtra) StartTask(
driverCtx context.Context,
tracer trace.Tracer,
logger hclog.Logger,
cfg *drivers.TaskConfig,
tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]],
) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
ctx, span := tracer.Start(driverCtx, "start-task-validation", trace.WithAttributes(
bgContext, bgCancel := context.WithCancel(driverCtx)
defer bgCancel()

ctx, span := tracer.Start(bgContext, "start-task-validation", trace.WithAttributes(
attribute.String("alloc.id", cfg.AllocID),
))
defer span.End()
Expand Down Expand Up @@ -183,7 +190,6 @@ func (de *DriverExtra) StartTask(cfg *drivers.TaskConfig,

go func() {
defer cancel()
h.Cancel = cancel

buildContext, childBuildSpan := tracer.Start(
trace.ContextWithSpanContext(cancellableBuildContext, childSpan.SpanContext()),
Expand All @@ -207,37 +213,34 @@ func (de *DriverExtra) StartTask(cfg *drivers.TaskConfig,
return handle, nil, nil
}

func (de *DriverExtra) WaitTask(ctx, driverCtx context.Context, _ trace.Tracer, tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]], _ hclog.Logger, taskID string) (<-chan *drivers.ExitResult, error) {
func (de *DriverExtra) WaitTask(
ctx,
driverCtx context.Context,
_ trace.Tracer,
_ hclog.Logger,
tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]],
taskID string,
) (<-chan *drivers.ExitResult, error) {
handle, ok := tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}

ch := make(chan *drivers.ExitResult)
go handleWait(ctx, driverCtx, handle, ch)
go driver.HandleWait(ctx, driverCtx, handle, ch)

return ch, nil
}

func handleWait(ctx, driverCtx context.Context, handle *driver.TaskHandle[*extraTaskHandle], ch chan *drivers.ExitResult) {
defer close(ch)

for {
select {
case <-ctx.Done():
return
case <-driverCtx.Done():
return
case <-handle.Ctx.Done():
s := handle.TaskStatus()
if s.State == drivers.TaskStateExited {
ch <- handle.ExitResult
}
}
}
}

func (de *DriverExtra) StopTask(_ context.Context, _ trace.Tracer, tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]], _ hclog.Logger, taskID string, timeout time.Duration, signal string) error {
func (de *DriverExtra) StopTask(
_ context.Context,
_ trace.Tracer,
_ hclog.Logger,
tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]],
taskID string,
timeout time.Duration,
signal string,
) error {
handle, ok := tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
Expand All @@ -248,7 +251,14 @@ func (de *DriverExtra) StopTask(_ context.Context, _ trace.Tracer, tasks *driver
return nil
}

func (de *DriverExtra) DestroyTask(_ context.Context, _ trace.Tracer, tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]], _ hclog.Logger, taskID string, force bool) error {
func (de *DriverExtra) DestroyTask(
_ context.Context,
_ trace.Tracer,
_ hclog.Logger,
tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]],
taskID string,
force bool,
) error {
handle, ok := tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
Expand All @@ -264,16 +274,24 @@ func (de *DriverExtra) DestroyTask(_ context.Context, _ trace.Tracer, tasks *dri
return nil
}

func (de *DriverExtra) TaskStats(ctx context.Context, driverCtx context.Context, tracer trace.Tracer, tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]], taskID string, interval time.Duration) (<-chan *structs.TaskResourceUsage, error) {
h, ok := tasks.Get(taskID)
func (de *DriverExtra) TaskStats(
ctx,
driverCtx context.Context,
tracer trace.Tracer,
_ hclog.Logger,
tasks *driver.TaskStore[*driver.TaskHandle[*extraTaskHandle]],
taskID string,
interval time.Duration,
) (<-chan *structs.TaskResourceUsage, error) {
_, ok := tasks.Get(taskID)
if !ok {
telemetry.ReportCriticalError(ctx, drivers.ErrTaskNotFound)

return nil, drivers.ErrTaskNotFound
}

statsChannel := make(chan *drivers.TaskResourceUsage)
go h.Extra.Stats(ctx, statsChannel, interval)
go driver.Stats(ctx, driverCtx, statsChannel, interval)

return statsChannel, nil
}
Loading

0 comments on commit 8109336

Please sign in to comment.