diff --git a/internal/lookoutingesterv2/lookoutdb/insertion.go b/internal/lookoutingesterv2/lookoutdb/insertion.go index 481c888bf45..f65bf774180 100644 --- a/internal/lookoutingesterv2/lookoutdb/insertion.go +++ b/internal/lookoutingesterv2/lookoutdb/insertion.go @@ -52,7 +52,18 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru start := time.Now() // Jobs need to be ingested first as other updates may reference these - l.CreateJobs(ctx, instructions.JobsToCreate) + wgJobIngestion := sync.WaitGroup{} + wgJobIngestion.Add(2) + go func() { + defer wgJobIngestion.Done() + l.CreateJobs(ctx, instructions.JobsToCreate) + }() + go func() { + defer wgJobIngestion.Done() + l.CreateJobSpecs(ctx, instructions.JobsToCreate) + }() + + wgJobIngestion.Wait() // Now we can job updates, annotations and new job runs wg := sync.WaitGroup{} @@ -98,6 +109,22 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model log.Infof("Inserted %d jobs in %s", len(instructions), taken) } +func (l *LookoutDb) CreateJobSpecs(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) { + if len(instructions) == 0 { + return + } + start := time.Now() + err := l.CreateJobSpecsBatch(ctx, instructions) + if err != nil { + log.WithError(err).Warn("Creating job specs via batch failed, will attempt to insert serially (this might be slow).") + l.CreateJobSpecsScalar(ctx, instructions) + } + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job_spec", commonmetrics.DBOperationInsert, len(instructions), taken) + l.metrics.RecordRowsChange("job_spec", commonmetrics.DBOperationInsert, len(instructions)) + log.Infof("Inserted %d job specs in %s", len(instructions), taken) +} + func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) { if len(instructions) == 0 { return @@ -185,7 +212,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* state smallint, last_transition_time timestamp, last_transition_time_seconds bigint, - job_spec bytea, priority_class varchar(63), annotations jsonb ) ON COMMIT DROP;`, tmpTable)) @@ -213,7 +239,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* "state", "last_transition_time", "last_transition_time_seconds", - "job_spec", "priority_class", "annotations", }, @@ -233,7 +258,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* instructions[i].State, instructions[i].LastTransitionTime, instructions[i].LastTransitionTimeSeconds, - instructions[i].JobProto, instructions[i].PriorityClass, instructions[i].Annotations, }, nil @@ -261,7 +285,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* state, last_transition_time, last_transition_time_seconds, - job_spec, priority_class, annotations ) SELECT * from %s @@ -294,11 +317,10 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions [] state, last_transition_time, last_transition_time_seconds, - job_spec, priority_class, annotations ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT DO NOTHING` for _, i := range instructions { err := l.withDatabaseRetryInsert(func() error { @@ -317,7 +339,6 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions [] i.State, i.LastTransitionTime, i.LastTransitionTimeSeconds, - i.JobProto, i.PriorityClass, i.Annotations, ) @@ -446,6 +467,83 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions [] } } +func (l *LookoutDb) CreateJobSpecsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error { + return l.withDatabaseRetryInsert(func() error { + tmpTable := "job_spec_create_tmp" + + createTmp := func(tx pgx.Tx) error { + _, err := tx.Exec(ctx, fmt.Sprintf(` + CREATE TEMPORARY TABLE %s ( + job_id varchar(32), + job_spec bytea + ) ON COMMIT DROP;`, tmpTable)) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) + } + return err + } + + insertTmp := func(tx pgx.Tx) error { + _, err := tx.CopyFrom(ctx, + pgx.Identifier{tmpTable}, + []string{ + "job_id", + "job_spec", + }, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + return []interface{}{ + instructions[i].JobId, + instructions[i].JobProto, + }, nil + }), + ) + return err + } + + copyToDest := func(tx pgx.Tx) error { + _, err := tx.Exec( + ctx, + fmt.Sprintf(` + INSERT INTO job_spec ( + job_id, + job_spec + ) SELECT * from %s + ON CONFLICT DO NOTHING`, tmpTable), + ) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) + } + return err + } + + return batchInsert(ctx, l.db, createTmp, insertTmp, copyToDest) + }) +} + +func (l *LookoutDb) CreateJobSpecsScalar(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) { + sqlStatement := `INSERT INTO job_spec ( + job_id, + job_spec + ) + VALUES ($1, $2) + ON CONFLICT DO NOTHING` + for _, i := range instructions { + err := l.withDatabaseRetryInsert(func() error { + _, err := l.db.Exec(ctx, sqlStatement, + i.JobId, + i.JobProto, + ) + if err != nil { + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) + } + return err + }) + if err != nil { + log.WithError(err).Warnf("Create job spec for job %s, jobset %s failed", i.JobId, i.JobSet) + } + } +} + func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) error { return l.withDatabaseRetryInsert(func() error { tmpTable := "job_run_create_tmp" diff --git a/internal/lookoutingesterv2/lookoutdb/insertion_test.go b/internal/lookoutingesterv2/lookoutdb/insertion_test.go index 0806c841c83..5f20ad259cf 100644 --- a/internal/lookoutingesterv2/lookoutdb/insertion_test.go +++ b/internal/lookoutingesterv2/lookoutdb/insertion_test.go @@ -79,6 +79,11 @@ type JobRow struct { Annotations map[string]string } +type JobSpecRow struct { + JobId string + JobProto []byte +} + type JobRunRow struct { RunId string JobId string @@ -147,7 +152,7 @@ var expectedJobAfterSubmit = JobRow{ State: lookout.JobQueuedOrdinal, LastTransitionTime: baseTime, LastTransitionTimeSeconds: baseTime.Unix(), - JobProto: []byte(jobProto), + JobProto: []byte(nil), Duplicate: false, PriorityClass: priorityClass, Annotations: annotations, @@ -167,7 +172,7 @@ var expectedJobAfterUpdate = JobRow{ State: lookout.JobFailedOrdinal, LastTransitionTime: updateTime, LastTransitionTimeSeconds: updateTime.Unix(), - JobProto: []byte(jobProto), + JobProto: []byte(nil), Duplicate: false, PriorityClass: priorityClass, Annotations: annotations, @@ -841,10 +846,10 @@ func TestStoreNullValue(t *testing.T) { err := ldb.Store(armadacontext.Background(), instructions) assert.NoError(t, err) - job := getJob(t, ldb.db, jobIdString) + jobSpec := getJobSpec(t, ldb.db, jobIdString) jobRun := getJobRun(t, ldb.db, runIdString) - assert.Equal(t, jobProto, job.JobProto) + assert.Equal(t, jobProto, jobSpec.JobProto) assert.Equal(t, errorMsg, jobRun.Error) assert.Equal(t, debugMsg, jobRun.Debug) return nil @@ -988,6 +993,23 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow { return job } +func getJobSpec(t *testing.T, db *pgxpool.Pool, jobId string) JobSpecRow { + jobSpec := JobSpecRow{} + r := db.QueryRow( + armadacontext.Background(), + `SELECT + job_id, + job_spec + FROM job_spec WHERE job_id = $1`, + jobId) + err := r.Scan( + &jobSpec.JobId, + &jobSpec.JobProto, + ) + assert.Nil(t, err) + return jobSpec +} + func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow { run := JobRunRow{} r := db.QueryRow( diff --git a/internal/lookoutv2/pruner/pruner.go b/internal/lookoutv2/pruner/pruner.go index 6ece3dd3962..2b87e90d5b6 100644 --- a/internal/lookoutv2/pruner/pruner.go +++ b/internal/lookoutv2/pruner/pruner.go @@ -132,6 +132,7 @@ func deleteBatch(ctx *armadacontext.Context, tx pgx.Tx, batchLimit int) (int, er } _, err = tx.Exec(ctx, ` DELETE FROM job WHERE job_id in (SELECT job_id from batch); + DELETE FROM job_spec WHERE job_id in (SELECT job_id from batch); DELETE FROM job_run WHERE job_id in (SELECT job_id from batch); DELETE FROM job_ids_to_delete WHERE job_id in (SELECT job_id from batch); TRUNCATE TABLE batch;`) diff --git a/internal/lookoutv2/pruner/pruner_test.go b/internal/lookoutv2/pruner/pruner_test.go index 3fae15ac9b2..bf34c998b2b 100644 --- a/internal/lookoutv2/pruner/pruner_test.go +++ b/internal/lookoutv2/pruner/pruner_test.go @@ -145,6 +145,7 @@ func TestPruneDb(t *testing.T) { queriedJobIdsPerTable := []map[string]bool{ selectStringSet(t, db, "SELECT job_id FROM job"), + selectStringSet(t, db, "SELECT job_id FROM job_spec"), selectStringSet(t, db, "SELECT DISTINCT job_id FROM job_run"), } for _, queriedJobs := range queriedJobIdsPerTable { diff --git a/internal/lookoutv2/repository/getjobspec.go b/internal/lookoutv2/repository/getjobspec.go index 55799249f35..8e51fe89871 100644 --- a/internal/lookoutv2/repository/getjobspec.go +++ b/internal/lookoutv2/repository/getjobspec.go @@ -30,10 +30,17 @@ func NewSqlGetJobSpecRepository(db *pgxpool.Pool, decompressor compress.Decompre func (r *SqlGetJobSpecRepository) GetJobSpec(ctx *armadacontext.Context, jobId string) (*api.Job, error) { var rawBytes []byte - err := r.db.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes) + + err := r.db.QueryRow( + ctx, ` + SELECT + COALESCE(job_spec.job_spec, job.job_spec) + FROM job LEFT JOIN job_spec + ON job.job_id = job_spec.job_id + WHERE job.job_id = $1`, jobId).Scan(&rawBytes) if err != nil { if err == pgx.ErrNoRows { - return nil, errors.Errorf("job with id %s not found", jobId) + return nil, errors.Errorf("job_spec with job id %s not found", jobId) } return nil, err } diff --git a/internal/lookoutv2/repository/getjobspec_test.go b/internal/lookoutv2/repository/getjobspec_test.go index 62e619dbf41..7eeb439905b 100644 --- a/internal/lookoutv2/repository/getjobspec_test.go +++ b/internal/lookoutv2/repository/getjobspec_test.go @@ -3,6 +3,7 @@ package repository import ( "testing" + "github.com/gogo/protobuf/proto" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" @@ -50,6 +51,84 @@ func TestGetJobSpec(t *testing.T) { assert.NoError(t, err) } +func TestMIGRATEDGetJobSpec(t *testing.T) { + var migratedResult *api.Job + err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) + store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) + + _ = NewJobSimulator(converter, store). + Submit(queue, jobSet, owner, namespace, baseTime, &JobOptions{ + JobId: jobId, + Priority: priority, + PriorityClass: "other-default", + Cpu: cpu, + Memory: memory, + EphemeralStorage: ephemeralStorage, + Gpu: gpu, + Annotations: map[string]string{ + "step_path": "/1/2/3", + "hello": "world", + }, + }). + Pending(runId, cluster, baseTime). + Running(runId, node, baseTime). + RunSucceeded(runId, baseTime). + Succeeded(baseTime). + Build(). + ApiJob() + + repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) + var err error + migratedResult, err = repo.GetJobSpec(armadacontext.TODO(), jobId) + assert.NoError(t, err) + return nil + }) + assert.NoError(t, err) + + var result *api.Job + err = lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + bytes, err := proto.Marshal(migratedResult) + assert.NoError(t, err) + + _, err = db.Exec(armadacontext.Background(), + `INSERT INTO job ( + job_id, queue, owner, namespace, jobset, + cpu, + memory, + ephemeral_storage, + gpu, + priority, + submitted, + state, + last_transition_time, + last_transition_time_seconds, + job_spec, + priority_class, + annotations + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + ON CONFLICT DO NOTHING`, + jobId, queue, owner, namespace, jobSet, + int64(15), int64(48*1024*1024*1024), int64(100*1024*1024*1024), 8, + priority, baseTime, 1, baseTime, baseTime.Unix(), bytes, "other-default", + map[string]string{ + "step_path": "/1/2/3", + "hello": "world", + }) + assert.NoError(t, err) + + repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) + result, err = repo.GetJobSpec(armadacontext.TODO(), jobId) + assert.NoError(t, err) + + return nil + }) + assert.NoError(t, err) + + assertApiJobsEquivalent(t, migratedResult, result) +} + func TestGetJobSpecError(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{}) diff --git a/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql b/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql new file mode 100644 index 00000000000..36f3fef6fd7 --- /dev/null +++ b/internal/lookoutv2/schema/migrations/013_add_job_spec_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS job_spec ( + job_id varchar(32) NOT NULL PRIMARY KEY, + job_spec bytea NOT NULL + ); +ALTER TABLE job_spec ALTER COLUMN job_spec SET STORAGE EXTERNAL; +ALTER TABLE job ALTER COLUMN job_spec DROP NOT NULL;