Skip to content

Commit

Permalink
unify the two api postgres connections into one
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Jun 5, 2024
1 parent 0785656 commit 8448edb
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 26 deletions.
9 changes: 0 additions & 9 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,4 @@ postgres:
dbname: lookout
sslmode: disable
queryapi:
enabled: false
maxQueryItems: 500
postgres:
connection:
host: postgres
port: 5432
user: postgres
password: psw
dbname: lookout
sslmode: disable
2 changes: 0 additions & 2 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,5 @@ type PostgresConfig struct {
}

type QueryApiConfig struct {
Enabled bool
Postgres PostgresConfig
MaxQueryItems int
}
21 changes: 6 additions & 15 deletions internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/extra/redisprometheus/v9"
Expand Down Expand Up @@ -78,14 +77,14 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
return nil
})

// Create database connection. This is used for the query api and also to store queues
// In a subsequent pr we will move deduplication here too and move the config out of the `queryapi` namespace
queryDb, err := database.OpenPgxPool(config.QueryApi.Postgres)
// Create database connection. This is used for the query api, queues and for job deduplication
dbPool, err := database.OpenPgxPool(config.Postgres)
if err != nil {
return errors.WithMessage(err, "error creating QueryApi postgres pool")
return errors.WithMessage(err, "error creating postgres pool")
}
defer dbPool.Close()
queryapiServer := queryapi.New(
queryDb,
dbPool,
config.QueryApi.MaxQueryItems,
func() compress.Decompressor { return compress.NewZlibDecompressor() })
api.RegisterJobsServer(grpcServer, queryapiServer)
Expand All @@ -99,7 +98,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
prometheus.MustRegister(
redisprometheus.NewCollector("armada", "events_redis", eventDb))

queueRepository := queue.NewPostgresQueueRepository(queryDb)
queueRepository := queue.NewPostgresQueueRepository(dbPool)
queueCache := queue.NewCachedQueueRepository(queueRepository, config.QueueCacheRefreshPeriod)
services = append(services, func() error {
return queueCache.Run(ctx)
Expand All @@ -114,14 +113,6 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
),
)

// If pool settings are provided, open a connection pool to be shared by all services.
var dbPool *pgxpool.Pool
dbPool, err = database.OpenPgxPool(config.Postgres)
if err != nil {
return err
}
defer dbPool.Close()

serverId := uuid.New()
var pulsarClient pulsar.Client
// API endpoints that generate Pulsar messages.
Expand Down

0 comments on commit 8448edb

Please sign in to comment.