Skip to content

Commit

Permalink
use milisecond precision timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Aug 12, 2023
1 parent 3364908 commit 74cc2bd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
43 changes: 25 additions & 18 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{})

type JobDb struct {
jobsById map[string]*Job
jobsByRunId map[uuid.UUID]string
jobsById *immutable.Map[string, *Job]
jobsByRunId *immutable.Map[uuid.UUID, string]
jobsByQueue map[string]immutable.SortedSet[*Job]
copyMutex sync.Mutex
writerMutex sync.Mutex
}

func NewJobDb() *JobDb {
return &JobDb{
jobsById: map[string]*Job{},
jobsByRunId: map[uuid.UUID]string{},
jobsById: immutable.NewMap[string, *Job](nil),
jobsByRunId: immutable.NewMap[uuid.UUID, string](&UUIDHasher{}),

Check failure on line 25 in internal/scheduler/jobdb/jobdb.go

View workflow job for this annotation

GitHub Actions / airflow-integration-tests (1.20)

undefined: UUIDHasher

Check failure on line 25 in internal/scheduler/jobdb/jobdb.go

View workflow job for this annotation

GitHub Actions / python-client-integration-tests (1.20)

undefined: UUIDHasher
jobsByQueue: map[string]immutable.SortedSet[*Job]{},
copyMutex: sync.Mutex{},
}
Expand All @@ -34,16 +34,16 @@ func (jobDb *JobDb) Upsert(txn *Txn, jobs []*Job) error {
return err
}
for _, job := range jobs {
existingJob := txn.jobsById[job.id]
if existingJob != nil {
existingJob, ok := txn.jobsById.Get(job.id)
if !ok {
existingQueue, ok := txn.jobsByQueue[existingJob.queue]
if ok {
txn.jobsByQueue[existingJob.queue] = existingQueue.Delete(existingJob)
}
}
txn.jobsById[job.id] = job
txn.jobsById = txn.jobsById.Set(job.id, job)
for _, run := range job.runsById {
txn.jobsByRunId[run.id] = job.id
txn.jobsByRunId = txn.jobsByRunId.Set(run.id, job.id)
}
if job.Queued() {
newQueue, ok := txn.jobsByQueue[job.queue]
Expand All @@ -61,13 +61,14 @@ func (jobDb *JobDb) Upsert(txn *Txn, jobs []*Job) error {
// GetById returns the job with the given Id or nil if no such job exists
// The Job returned by this function *must not* be subsequently modified
func (jobDb *JobDb) GetById(txn *Txn, id string) *Job {
return txn.jobsById[id]
j, _ := txn.jobsById.Get(id)
return j
}

// GetByRunId returns the job with the given run id or nil if no such job exists
// The Job returned by this function *must not* be subsequently modified
func (jobDb *JobDb) GetByRunId(txn *Txn, runId uuid.UUID) *Job {
jobId := txn.jobsByRunId[runId]
jobId, _ := txn.jobsByRunId.Get(runId)
return jobDb.GetById(txn, jobId)
}

Expand All @@ -93,7 +94,13 @@ func (jobDb *JobDb) QueuedJobs(txn *Txn, queue string) *immutable.SortedSetItera
// GetAll returns all jobs in the database.
// The Jobs returned by this function *must not* be subsequently modified
func (jobDb *JobDb) GetAll(txn *Txn) []*Job {
return maps.Values(txn.jobsById)
allJobs := make([]*Job, 0, txn.jobsById.Len())
iter := txn.jobsById.Iterator()
for !iter.Done() {
_, job, _ := iter.Next()
allJobs = append(allJobs, job)
}
return allJobs
}

// BatchDelete removes the jobs with the given ids from the database. Any ids that are not in the database will be
Expand All @@ -103,11 +110,11 @@ func (jobDb *JobDb) BatchDelete(txn *Txn, ids []string) error {
return err
}
for _, id := range ids {
job, present := txn.jobsById[id]
job, present := txn.jobsById.Get(id)
if present {
delete(txn.jobsById, id)
txn.jobsById = txn.jobsById.Delete(id)
for _, run := range job.runsById {
delete(txn.jobsByRunId, run.id)
txn.jobsByRunId = txn.jobsByRunId.Delete(run.id)
}
queue, ok := txn.jobsByQueue[job.queue]
if ok {
Expand Down Expand Up @@ -153,8 +160,8 @@ func (jobDb *JobDb) WriteTxn() *Txn {
defer jobDb.copyMutex.Unlock()
return &Txn{
readOnly: false,
jobsById: maps.Clone(jobDb.jobsById),
jobsByRunId: maps.Clone(jobDb.jobsByRunId),
jobsById: jobDb.jobsById,
jobsByRunId: jobDb.jobsByRunId,
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
active: true,
jobDb: jobDb,
Expand All @@ -167,8 +174,8 @@ func (jobDb *JobDb) WriteTxn() *Txn {
// until the transaction is committed.
type Txn struct {
readOnly bool
jobsById map[string]*Job
jobsByRunId map[uuid.UUID]string
jobsById *immutable.Map[string, *Job]
jobsByRunId *immutable.Map[uuid.UUID, string]
jobsByQueue map[string]immutable.SortedSet[*Job]
jobDb *JobDb
active bool
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/jobdb/jobdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestJobDb_TestTransactions(t *testing.T) {
txn3 := jobDb.ReadTxn()
assert.NotNil(t, jobDb.GetById(txn3, job.id))

assert.Error(t, jobDb.Upsert(txn1, []*Job{job})) // should be error as you can't insert after commmiting
assert.Error(t, jobDb.Upsert(txn1, []*Job{job})) // should be error as you can't insert after committing
}

func TestJobDb_TestBatchDelete(t *testing.T) {
Expand Down

0 comments on commit 74cc2bd

Please sign in to comment.