Skip to content

Commit

Permalink
Merge branch 'master' into theAntiYeti/schedulerMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
theAntiYeti authored Aug 16, 2023
2 parents 1a00553 + b65c08d commit b4ce3bd
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 26 deletions.
14 changes: 3 additions & 11 deletions internal/lookoutv2/repository/getjobrunerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ func NewSqlGetJobRunErrorRepository(db *pgxpool.Pool, decompressor compress.Deco

func (r *SqlGetJobRunErrorRepository) GetJobRunError(ctx context.Context, runId string) (string, error) {
var rawBytes []byte
err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
AccessMode: pgx.ReadOnly,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
err := tx.QueryRow(ctx, "SELECT error FROM job_run WHERE run_id = $1 AND error IS NOT NULL", runId).Scan(&rawBytes)
err := r.db.QueryRow(ctx, "SELECT error FROM job_run WHERE run_id = $1 AND error IS NOT NULL", runId).Scan(&rawBytes)
if err != nil {
if err == pgx.ErrNoRows {
return errors.Errorf("no error found for run with id %s", runId)
return "", errors.Errorf("no error found for run with id %s", runId)
}
return err
})
if err != nil {
return "", err
}

decompressed, err := r.decompressor.Decompress(rawBytes)
if err != nil {
log.WithError(err).Error("failed to decompress")
Expand Down
14 changes: 3 additions & 11 deletions internal/lookoutv2/repository/getjobspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,13 @@ func NewSqlGetJobSpecRepository(db *pgxpool.Pool, decompressor compress.Decompre

func (r *SqlGetJobSpecRepository) GetJobSpec(ctx context.Context, jobId string) (*api.Job, error) {
var rawBytes []byte
err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
AccessMode: pgx.ReadOnly,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
err := tx.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes)
err := r.db.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes)
if err != nil {
if err == pgx.ErrNoRows {
return errors.Errorf("job with id %s not found", jobId)
return nil, errors.Errorf("job with id %s not found", jobId)
}
return err
})
if err != nil {
return nil, err
}

decompressed, err := r.decompressor.Decompress(rawBytes)
if err != nil {
log.WithError(err).Error("failed to decompress")
Expand Down
6 changes: 2 additions & 4 deletions third_party/airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ python3.8 -m pip install armada-airflow

From the top level of the repo, you should run `make airflow-operator`. This will generate proto/grpc files in the jobservice folder.

Airflow with the Armada operator can be run alongside the other Armada services via the localdev docker-compose
environment. It is manually started in this way:
Airflow with the Armada operator can be run alongside the other Armada services via the docker-compose environment. It is manually started in this way:

```
cd localdev
docker-compose up -d airflow
mage airflow start
```

Airflow's web UI will then be accessible at http://localhost:8081 (login with admin/admin).
Expand Down

0 comments on commit b4ce3bd

Please sign in to comment.