Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Jul 1, 2024
1 parent 06880c8 commit ec8cbfb
Show file tree
Hide file tree
Showing 12 changed files with 396 additions and 613 deletions.
2 changes: 1 addition & 1 deletion internal/executor/service/job_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (r *JobRequester) createLeaseRequest() (*LeaseRequest, error) {
}

// Returns the RunIds of all managed pods that haven't been assigned to a node
func (r *JobRequester) getUnassignedRunIds(capacityReport *utilisation.ClusterAvailableCapacityReport) ([]armadaevents.Uuid, error) {
func (r *JobRequester) getUnassignedRunIds(capacityReport *utilisation.ClusterAvailableCapacityReport) ([]*armadaevents.Uuid, error) {
allAssignedRunIds := []string{}
allJobRunIds := []string{}

Expand Down
4 changes: 2 additions & 2 deletions internal/executor/service/job_requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestRequestJobsRuns_ConstructsCorrectLeaseRequest(t *testing.T) {
AvailableResource: *capacityReport.AvailableCapacity,
Nodes: []*executorapi.NodeInfo{&capacityReport.Nodes[0]},
// Should add any ids in the state but not in the capacity report into unassigned job run ids
UnassignedJobRunIds: []armadaevents.Uuid{},
UnassignedJobRunIds: []*armadaevents.Uuid{},
MaxJobsToLease: uint32(defaultMaxLeasedJobs),
},
},
Expand All @@ -86,7 +86,7 @@ func TestRequestJobsRuns_ConstructsCorrectLeaseRequest(t *testing.T) {
AvailableResource: *capacityReport.AvailableCapacity,
Nodes: []*executorapi.NodeInfo{&capacityReport.Nodes[0]},
// Should add any ids in the state but not in the capacity report into unassigned job run ids
UnassignedJobRunIds: []armadaevents.Uuid{*armadaevents.ProtoUuidFromUuid(leasedRunId)},
UnassignedJobRunIds: []*armadaevents.Uuid{armadaevents.ProtoUuidFromUuid(leasedRunId)},
MaxJobsToLease: 0,
},
},
Expand Down
6 changes: 3 additions & 3 deletions internal/executor/service/lease_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type LeaseRequest struct {
AvailableResource armadaresource.ComputeResources
Nodes []*executorapi.NodeInfo
UnassignedJobRunIds []armadaevents.Uuid
UnassignedJobRunIds []*armadaevents.Uuid
MaxJobsToLease uint32
}

Expand Down Expand Up @@ -60,8 +60,8 @@ func (requester *JobLeaseRequester) LeaseJobRuns(ctx *armadacontext.Context, req
leaseRequest := &executorapi.LeaseRequest{
ExecutorId: requester.clusterIdentity.GetClusterId(),
Pool: requester.clusterIdentity.GetClusterPool(),
MinimumJobSize: requester.minimumJobSize,
Resources: request.AvailableResource,
MinimumJobSize: requester.minimumJobSize.ToProtoMap(),
Resources: request.AvailableResource.ToProtoMap(),
Nodes: request.Nodes,
UnassignedJobRunIds: request.UnassignedJobRunIds,
MaxJobsToLease: request.MaxJobsToLease,
Expand Down
6 changes: 3 additions & 3 deletions internal/executor/service/lease_requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ func TestLeaseJobRuns_Send(t *testing.T) {
RunIdsByState: map[string]api.JobState{"id1": api.JobState_RUNNING},
},
},
UnassignedJobRunIds: []armadaevents.Uuid{*id1},
UnassignedJobRunIds: []*armadaevents.Uuid{id1},
MaxJobsToLease: uint32(5),
}

expectedRequest := &executorapi.LeaseRequest{
ExecutorId: defaultClusterIdentity.GetClusterId(),
Pool: defaultClusterIdentity.GetClusterPool(),
Resources: leaseRequest.AvailableResource,
MinimumJobSize: defaultMinimumJobSize,
Resources: leaseRequest.AvailableResource.ToProtoMap(),
MinimumJobSize: defaultMinimumJobSize.ToProtoMap(),
Nodes: leaseRequest.Nodes,
UnassignedJobRunIds: leaseRequest.UnassignedJobRunIds,
MaxJobsToLease: leaseRequest.MaxJobsToLease,
Expand Down
6 changes: 3 additions & 3 deletions internal/executor/util/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"github.com/armadaproject/armada/pkg/armadaevents"
)

func StringUuidsToUuids(uuidStrings []string) ([]armadaevents.Uuid, error) {
result := make([]armadaevents.Uuid, 0, len(uuidStrings))
func StringUuidsToUuids(uuidStrings []string) ([]*armadaevents.Uuid, error) {
result := make([]*armadaevents.Uuid, 0, len(uuidStrings))
for _, uuidString := range uuidStrings {
uuid, err := armadaevents.ProtoUuidFromUuidString(uuidString)
if err != nil {
return nil, fmt.Errorf("failed to convert uuid string %s to uuid because %s", uuidString, err)
}
result = append(result, *uuid)
result = append(result, uuid)
}
return result, nil
}
8 changes: 4 additions & 4 deletions internal/executor/util/uuid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ func TestStringUuidsToUuids(t *testing.T) {
tests := []struct {
name string
uuidStrings []string
want []armadaevents.Uuid
want []*armadaevents.Uuid
wantErr bool
}{
{"invalid uuid", []string{"1", "2", "3"}, []armadaevents.Uuid{}, true},
{"valid uuid", []string{"52a3cfa6-8ce1-42b1-97cf-74f1b63f21b9"}, []armadaevents.Uuid{{5954831446549021361, 10939090601399755193}}, false},
{"invalid uuid", []string{"1", "2", "3"}, []*armadaevents.Uuid{}, true},
{"valid uuid", []string{"52a3cfa6-8ce1-42b1-97cf-74f1b63f21b9"}, []*armadaevents.Uuid{{5954831446549021361, 10939090601399755193}}, false},
{
"valid uuid2",
[]string{"52a3cfa6-8ce1-42b1-97cf-74f1b63f21b9", "59567531-2a42-4b5b-9aba-b3d400c35b4c"},
[]armadaevents.Uuid{
[]*armadaevents.Uuid{
{5954831446549021361, 10939090601399755193},
{6437461571395537755, 11149421550636325708},
},
Expand Down
26 changes: 15 additions & 11 deletions internal/executor/utilisation/cluster_utilisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package utilisation
import (
"fmt"

armadaslices "github.com/armadaproject/armada/internal/common/slices"

"github.com/armadaproject/armada/pkg/executorapi"

"github.com/pkg/errors"
Expand Down Expand Up @@ -112,24 +114,26 @@ func (cls *ClusterUtilisationService) GetAvailableClusterCapacity() (*ClusterAva
usageByQueue := cls.getPodUtilisationByQueue(runningNodePodsArmada)
resourceUsageByQueue := make(map[string]*executorapi.ComputeResource)
for queueName, resourceUsage := range usageByQueue {
resourceUsageByQueue[queueName] = &executorapi.ComputeResource{Resources: resourceUsage}
resourceUsageByQueue[queueName] = executorapi.ComputeResourceFromProtoResources(resourceUsage)
}

nodeAllocatedResources := make(map[int32]executorapi.ComputeResource)
nodeAllocatedResources := make(map[int32]*executorapi.ComputeResource)
for p, rl := range allocatedByPriority {
nodeAllocatedResources[p] = executorapi.ComputeResource{Resources: rl.Resources}
nodeAllocatedResources[p] = executorapi.ComputeResourceFromProtoResources(rl.Resources)
}
nodeNonArmadaAllocatedResources := make(map[int32]executorapi.ComputeResource)
nodeNonArmadaAllocatedResources := make(map[int32]*executorapi.ComputeResource)
for p, rl := range allocatedByPriorityNonArmada {
nodeNonArmadaAllocatedResources[p] = executorapi.ComputeResource{Resources: rl.Resources}
nodeNonArmadaAllocatedResources[p] = executorapi.ComputeResourceFromProtoResources(rl.Resources)
}
nodes = append(nodes, executorapi.NodeInfo{
Name: node.Name,
Labels: cls.filterTrackedLabels(node.Labels),
Taints: node.Spec.Taints,
AllocatableResources: allocatable,
AvailableResources: available,
TotalResources: allocatable,
Name: node.Name,
Labels: cls.filterTrackedLabels(node.Labels),
Taints: armadaslices.Map(node.Spec.Taints, func(t v1.Taint) *v1.Taint {
return &t
}),
AllocatableResources: allocatable.ToProtoMap(),
AvailableResources: available.ToProtoMap(),
TotalResources: allocatable.ToProtoMap(),
AllocatedResources: nodeAllocatedResources,
RunIdsByState: runIdsByNode[node.Name],
NonArmadaAllocatedResources: nodeNonArmadaAllocatedResources,
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
Id: req.ExecutorId,
Pool: req.Pool,
Nodes: nodes,
MinimumJobSize: schedulerobjects.ResourceList{Resources: req.MinimumJobSize},
MinimumJobSize: executorapi.ResourceListFromProtoResources(req.MinimumJobSize),
LastUpdateTime: now,
UnassignedJobRuns: slices.Map(req.UnassignedJobRunIds, func(jobId armadaevents.Uuid) string {
return strings.ToLower(armadaevents.UuidFromProtoUuid(&jobId).String())
UnassignedJobRuns: slices.Map(req.UnassignedJobRunIds, func(jobId *armadaevents.Uuid) string {
return strings.ToLower(armadaevents.UuidFromProtoUuid(jobId).String())
}),
}
}
Expand All @@ -356,7 +356,7 @@ func runIdsFromLeaseRequest(req *executorapi.LeaseRequest) ([]uuid.UUID, error)
}
}
for _, runId := range req.UnassignedJobRunIds {
runIds = append(runIds, armadaevents.UuidFromProtoUuid(&runId))
runIds = append(runIds, armadaevents.UuidFromProtoUuid(runId))
}
return runIds, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
NodeType: "node-type-1",
},
},
UnassignedJobRunIds: []armadaevents.Uuid{*armadaevents.ProtoUuidFromUuid(runId3)},
UnassignedJobRunIds: []*armadaevents.Uuid{armadaevents.ProtoUuidFromUuid(runId3)},
MaxJobsToLease: uint32(maxJobsPerCall),
}
defaultExpectedExecutor := &schedulerobjects.Executor{
Expand Down
Loading

0 comments on commit ec8cbfb

Please sign in to comment.