Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Job Spec to its own table to improve performance #3961

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 106 additions & 8 deletions internal/lookoutingesterv2/lookoutdb/insertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,18 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru

start := time.Now()
// Jobs need to be ingested first as other updates may reference these
l.CreateJobs(ctx, instructions.JobsToCreate)
wgJobIngestion := sync.WaitGroup{}
wgJobIngestion.Add(2)
go func() {
defer wgJobIngestion.Done()
l.CreateJobs(ctx, instructions.JobsToCreate)
}()
go func() {
defer wgJobIngestion.Done()
l.CreateJobSpecs(ctx, instructions.JobsToCreate)
}()

wgJobIngestion.Wait()

// Now we can job updates, annotations and new job runs
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -98,6 +109,22 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model
log.Infof("Inserted %d jobs in %s", len(instructions), taken)
}

func (l *LookoutDb) CreateJobSpecs(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) {
if len(instructions) == 0 {
return
}
start := time.Now()
err := l.CreateJobSpecsBatch(ctx, instructions)
if err != nil {
log.WithError(err).Warn("Creating job specs via batch failed, will attempt to insert serially (this might be slow).")
l.CreateJobSpecsScalar(ctx, instructions)
}
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job_spec", commonmetrics.DBOperationInsert, len(instructions), taken)
l.metrics.RecordRowsChange("job_spec", commonmetrics.DBOperationInsert, len(instructions))
log.Infof("Inserted %d job specs in %s", len(instructions), taken)
}

func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) {
if len(instructions) == 0 {
return
Expand Down Expand Up @@ -185,7 +212,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
state smallint,
last_transition_time timestamp,
last_transition_time_seconds bigint,
job_spec bytea,
priority_class varchar(63),
annotations jsonb
) ON COMMIT DROP;`, tmpTable))
Expand Down Expand Up @@ -213,7 +239,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
"state",
"last_transition_time",
"last_transition_time_seconds",
"job_spec",
"priority_class",
"annotations",
},
Expand All @@ -233,7 +258,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
instructions[i].State,
instructions[i].LastTransitionTime,
instructions[i].LastTransitionTimeSeconds,
instructions[i].JobProto,
instructions[i].PriorityClass,
instructions[i].Annotations,
}, nil
Expand Down Expand Up @@ -261,7 +285,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
) SELECT * from %s
Expand Down Expand Up @@ -294,11 +317,10 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT DO NOTHING`
for _, i := range instructions {
err := l.withDatabaseRetryInsert(func() error {
Expand All @@ -317,7 +339,6 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
i.State,
i.LastTransitionTime,
i.LastTransitionTimeSeconds,
i.JobProto,
i.PriorityClass,
i.Annotations,
)
Expand Down Expand Up @@ -446,6 +467,83 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions []
}
}

func (l *LookoutDb) CreateJobSpecsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error {
return l.withDatabaseRetryInsert(func() error {
tmpTable := "job_spec_create_tmp"

createTmp := func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, fmt.Sprintf(`
CREATE TEMPORARY TABLE %s (
job_id varchar(32),
job_spec bytea
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}

insertTmp := func(tx pgx.Tx) error {
_, err := tx.CopyFrom(ctx,
pgx.Identifier{tmpTable},
[]string{
"job_id",
"job_spec",
},
pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) {
return []interface{}{
instructions[i].JobId,
instructions[i].JobProto,
}, nil
}),
)
return err
}

copyToDest := func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
fmt.Sprintf(`
INSERT INTO job_spec (
job_id,
job_spec
) SELECT * from %s
ON CONFLICT DO NOTHING`, tmpTable),
)
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
}

return batchInsert(ctx, l.db, createTmp, insertTmp, copyToDest)
})
}

func (l *LookoutDb) CreateJobSpecsScalar(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) {
sqlStatement := `INSERT INTO job_spec (
job_id,
job_spec
)
VALUES ($1, $2)
ON CONFLICT DO NOTHING`
for _, i := range instructions {
err := l.withDatabaseRetryInsert(func() error {
_, err := l.db.Exec(ctx, sqlStatement,
i.JobId,
i.JobProto,
)
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
})
if err != nil {
log.WithError(err).Warnf("Create job spec for job %s, jobset %s failed", i.JobId, i.JobSet)
}
}
}

func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) error {
return l.withDatabaseRetryInsert(func() error {
tmpTable := "job_run_create_tmp"
Expand Down
30 changes: 26 additions & 4 deletions internal/lookoutingesterv2/lookoutdb/insertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type JobRow struct {
Annotations map[string]string
}

type JobSpecRow struct {
JobId string
JobProto []byte
}

type JobRunRow struct {
RunId string
JobId string
Expand Down Expand Up @@ -147,7 +152,7 @@ var expectedJobAfterSubmit = JobRow{
State: lookout.JobQueuedOrdinal,
LastTransitionTime: baseTime,
LastTransitionTimeSeconds: baseTime.Unix(),
JobProto: []byte(jobProto),
JobProto: []byte(nil),
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
Expand All @@ -167,7 +172,7 @@ var expectedJobAfterUpdate = JobRow{
State: lookout.JobFailedOrdinal,
LastTransitionTime: updateTime,
LastTransitionTimeSeconds: updateTime.Unix(),
JobProto: []byte(jobProto),
JobProto: []byte(nil),
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
Expand Down Expand Up @@ -841,10 +846,10 @@ func TestStoreNullValue(t *testing.T) {
err := ldb.Store(armadacontext.Background(), instructions)
assert.NoError(t, err)

job := getJob(t, ldb.db, jobIdString)
jobSpec := getJobSpec(t, ldb.db, jobIdString)
jobRun := getJobRun(t, ldb.db, runIdString)

assert.Equal(t, jobProto, job.JobProto)
assert.Equal(t, jobProto, jobSpec.JobProto)
assert.Equal(t, errorMsg, jobRun.Error)
assert.Equal(t, debugMsg, jobRun.Debug)
return nil
Expand Down Expand Up @@ -988,6 +993,23 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow {
return job
}

func getJobSpec(t *testing.T, db *pgxpool.Pool, jobId string) JobSpecRow {
jobSpec := JobSpecRow{}
r := db.QueryRow(
armadacontext.Background(),
`SELECT
job_id,
job_spec
FROM job_spec WHERE job_id = $1`,
jobId)
err := r.Scan(
&jobSpec.JobId,
&jobSpec.JobProto,
)
assert.Nil(t, err)
return jobSpec
}

func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow {
run := JobRunRow{}
r := db.QueryRow(
Expand Down
1 change: 1 addition & 0 deletions internal/lookoutv2/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func deleteBatch(ctx *armadacontext.Context, tx pgx.Tx, batchLimit int) (int, er
}
_, err = tx.Exec(ctx, `
DELETE FROM job WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_spec WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_run WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_ids_to_delete WHERE job_id in (SELECT job_id from batch);
TRUNCATE TABLE batch;`)
Expand Down
1 change: 1 addition & 0 deletions internal/lookoutv2/pruner/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func TestPruneDb(t *testing.T) {

queriedJobIdsPerTable := []map[string]bool{
selectStringSet(t, db, "SELECT job_id FROM job"),
selectStringSet(t, db, "SELECT job_id FROM job_spec"),
selectStringSet(t, db, "SELECT DISTINCT job_id FROM job_run"),
}
for _, queriedJobs := range queriedJobIdsPerTable {
Expand Down
11 changes: 9 additions & 2 deletions internal/lookoutv2/repository/getjobspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ func NewSqlGetJobSpecRepository(db *pgxpool.Pool, decompressor compress.Decompre

func (r *SqlGetJobSpecRepository) GetJobSpec(ctx *armadacontext.Context, jobId string) (*api.Job, error) {
var rawBytes []byte
err := r.db.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes)

err := r.db.QueryRow(
ctx, `
SELECT
COALESCE(job_spec.job_spec, job.job_spec)
FROM job LEFT JOIN job_spec
ON job.job_id = job_spec.job_id
WHERE job.job_id = $1`, jobId).Scan(&rawBytes)
if err != nil {
if err == pgx.ErrNoRows {
return nil, errors.Errorf("job with id %s not found", jobId)
return nil, errors.Errorf("job_spec with job id %s not found", jobId)
}
return nil, err
}
Expand Down
79 changes: 79 additions & 0 deletions internal/lookoutv2/repository/getjobspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repository
import (
"testing"

"github.com/gogo/protobuf/proto"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -50,6 +51,84 @@ func TestGetJobSpec(t *testing.T) {
assert.NoError(t, err)
}

func TestMIGRATEDGetJobSpec(t *testing.T) {
var migratedResult *api.Job
err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{})
store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10)

_ = NewJobSimulator(converter, store).
Submit(queue, jobSet, owner, namespace, baseTime, &JobOptions{
JobId: jobId,
Priority: priority,
PriorityClass: "other-default",
Cpu: cpu,
Memory: memory,
EphemeralStorage: ephemeralStorage,
Gpu: gpu,
Annotations: map[string]string{
"step_path": "/1/2/3",
"hello": "world",
},
}).
Pending(runId, cluster, baseTime).
Running(runId, node, baseTime).
RunSucceeded(runId, baseTime).
Succeeded(baseTime).
Build().
ApiJob()

repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
var err error
migratedResult, err = repo.GetJobSpec(armadacontext.TODO(), jobId)
assert.NoError(t, err)
return nil
})
assert.NoError(t, err)

var result *api.Job
err = lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
bytes, err := proto.Marshal(migratedResult)
assert.NoError(t, err)

_, err = db.Exec(armadacontext.Background(),
`INSERT INTO job (
job_id, queue, owner, namespace, jobset,
cpu,
memory,
ephemeral_storage,
gpu,
priority,
submitted,
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT DO NOTHING`,
jobId, queue, owner, namespace, jobSet,
int64(15), int64(48*1024*1024*1024), int64(100*1024*1024*1024), 8,
priority, baseTime, 1, baseTime, baseTime.Unix(), bytes, "other-default",
map[string]string{
"step_path": "/1/2/3",
"hello": "world",
})
assert.NoError(t, err)

repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
result, err = repo.GetJobSpec(armadacontext.TODO(), jobId)
assert.NoError(t, err)

return nil
})
assert.NoError(t, err)

assertApiJobsEquivalent(t, migratedResult, result)
}

func TestGetJobSpecError(t *testing.T) {
err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS job_spec (
job_id varchar(32) NOT NULL PRIMARY KEY,
job_spec bytea NOT NULL
);
ALTER TABLE job_spec ALTER COLUMN job_spec SET STORAGE EXTERNAL;
ALTER TABLE job ALTER COLUMN job_spec DROP NOT NULL;