From 9e166fae542dfb5ae916d267a35cffed52d1494a Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Wed, 14 Jul 2021 11:19:54 -0700 Subject: [PATCH 1/2] add SlaRestartInstances api --- jobUpdate.go | 5 +++++ realis.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/jobUpdate.go b/jobUpdate.go index 3ef2cb4..5ccbd1a 100644 --- a/jobUpdate.go +++ b/jobUpdate.go @@ -284,3 +284,8 @@ func (j *JobUpdate) PartitionPolicy(reschedule bool, delay int64) *JobUpdate { }) return j } + +func (j *JobUpdate) SlaPolicy(sla *aurora.SlaPolicy) *JobUpdate { + j.task.task.SlaPolicy = sla + return j +} diff --git a/realis.go b/realis.go index a7c536e..bc9171a 100644 --- a/realis.go +++ b/realis.go @@ -522,6 +522,50 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { return nil } +// Restarts specific instances specified with slaAware. +// If slaPolicy is nil, uses the existing slaPolicy of taskConfig. +func (c *Client) SlaRestartInstances(slaPolicy *aurora.SlaPolicy, jobKey *aurora.JobKey, instances ...int32) error { + c.logger.DebugPrintf("SlaRestartInstances Thrift Payload: %v %+v %v\n", slaPolicy, jobKey, instances) + + if len(instances) == 0 { + c.logger.DebugPrintf("it is not necessary to restart 0 instances") + return nil + } + + jobSummary, err := c.GetJobSummary(jobKey.Role) + if err != nil { + return err + } + var jobConfig *aurora.JobConfiguration + for _, s := range jobSummary.Summaries { + if s.Job.Key.Environment == jobKey.Environment && s.Job.Key.Name == jobKey.Name { + jobConfig = s.Job + } + } + if jobConfig == nil { + return fmt.Errorf("failed to find %v", jobKey) + } + + // create job update request + jobUpdate := JobUpdateFromConfig(jobConfig.TaskConfig) + jobUpdate. + SlaAware(true). + InstanceCount(jobConfig.InstanceCount) + if slaPolicy != nil { + jobUpdate.SlaPolicy(slaPolicy) + } + for _, v := range instances { + jobUpdate.AddInstanceRange(v, v) + } + + msg := fmt.Sprintf("SlaRestartInstances %v-%v via StartJobUpdate", jobKey, instances) + if _, err := c.StartJobUpdate(jobUpdate, ""); err != nil { + return errors.Wrap(err, msg) + } + + return nil +} + // Restarts all active tasks under a job configuration. func (c *Client) RestartJob(key aurora.JobKey) error { From c985d9170fcce9e997fb254a300577e29fe987fd Mon Sep 17 00:00:00 2001 From: lenhattan86 Date: Wed, 6 Oct 2021 16:15:51 -0700 Subject: [PATCH 2/2] add unit test --- realis.go | 18 ++++++++------- realis_e2e_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/realis.go b/realis.go index bc9171a..5d9b4fa 100644 --- a/realis.go +++ b/realis.go @@ -524,17 +524,19 @@ func (c *Client) RestartInstances(key aurora.JobKey, instances ...int32) error { // Restarts specific instances specified with slaAware. // If slaPolicy is nil, uses the existing slaPolicy of taskConfig. -func (c *Client) SlaRestartInstances(slaPolicy *aurora.SlaPolicy, jobKey *aurora.JobKey, instances ...int32) error { +func (c *Client) SlaRestartInstances(jobKey aurora.JobKey, + slaPolicy *aurora.SlaPolicy, + instances ...int32) (*aurora.StartJobUpdateResult_, error) { c.logger.DebugPrintf("SlaRestartInstances Thrift Payload: %v %+v %v\n", slaPolicy, jobKey, instances) if len(instances) == 0 { c.logger.DebugPrintf("it is not necessary to restart 0 instances") - return nil + return nil, nil } jobSummary, err := c.GetJobSummary(jobKey.Role) if err != nil { - return err + return nil, err } var jobConfig *aurora.JobConfiguration for _, s := range jobSummary.Summaries { @@ -543,7 +545,7 @@ func (c *Client) SlaRestartInstances(slaPolicy *aurora.SlaPolicy, jobKey *aurora } } if jobConfig == nil { - return fmt.Errorf("failed to find %v", jobKey) + return nil, fmt.Errorf("failed to find %v", jobKey) } // create job update request @@ -559,11 +561,11 @@ func (c *Client) SlaRestartInstances(slaPolicy *aurora.SlaPolicy, jobKey *aurora } msg := fmt.Sprintf("SlaRestartInstances %v-%v via StartJobUpdate", jobKey, instances) - if _, err := c.StartJobUpdate(jobUpdate, ""); err != nil { - return errors.Wrap(err, msg) + if result, err := c.StartJobUpdate(jobUpdate, fmt.Sprintf("restart instances %v", instances)); err != nil { + return nil, errors.Wrap(err, msg) + } else { + return result, nil } - - return nil } // Restarts all active tasks under a job configuration. diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 884a485..a004cdc 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -871,3 +871,59 @@ func TestRealisClient_GetJobSummary(t *testing.T) { assert.NoError(t, err) } + +func TestRealisClient_SlaRestartInstances(t *testing.T) { + // Create a single job + role := "vagrant" + env := "prod" + name := "slaRestartInstances" + + job := realis.NewJob(). + Environment(env). + Role(role). + Name(name). + ThermosExecutor(thermosExec). + CPU(.01). + RAM(4). + Disk(10). + Tier("preferred"). + InstanceCount(3). + IsService(true) + + // Needed to populate the task config correctly + assert.NoError(t, job.BuildThermosPayload()) + + var cpu = 3.5 + var ram int64 = 20480 + var disk int64 = 10240 + err := r.SetQuota(role, &cpu, &ram, &disk) + assert.NoError(t, err) + + err = r.CreateJob(job) + assert.NoError(t, err) + + // waiting until all instances running + success, err := r.MonitorScheduleStatus(job.JobKey(), + job.GetInstanceCount(), + []aurora.ScheduleStatus{aurora.ScheduleStatus_RUNNING}, + 1*time.Second, + 150*time.Second) + assert.True(t, success) + assert.NoError(t, err) + + slaPolicy := &aurora.SlaPolicy{ + PercentageSlaPolicy: &aurora.PercentageSlaPolicy{ + Percentage: 50, + DurationSecs: 0, + }, + } + + t.Run("TestRealisClient_SlaRestartInstances", func(t *testing.T) { + result, err := r.SlaRestartInstances(job.JobKey(), slaPolicy, 0) + assert.NoError(t, err) + assert.NotNil(t, result) + + assert.NoError(t, r.AbortJobUpdate(*result.GetKey(), "abort update to kill the job")) + assert.NoError(t, r.KillJob(job.JobKey())) + }) +}