Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PostgreSQL support #922

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
11 changes: 6 additions & 5 deletions alby/alby_oauth_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package alby
import (
"testing"

"github.com/getAlby/hub/config"
"github.com/getAlby/hub/events"
"github.com/getAlby/hub/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tyler-smith/go-bip32"
"github.com/tyler-smith/go-bip39"

"github.com/getAlby/hub/config"
"github.com/getAlby/hub/events"
"github.com/getAlby/hub/tests"
)

func TestExistingEncryptedBackup(t *testing.T) {
defer tests.RemoveTestService()
svc, err := tests.CreateTestService()
require.NoError(t, err)
defer svc.Remove()

mnemonic := "limit reward expect search tissue call visa fit thank cream brave jump"
unlockPassword := "123"
Expand All @@ -40,11 +41,11 @@ func TestExistingEncryptedBackup(t *testing.T) {
}

func TestEncryptedBackup(t *testing.T) {
defer tests.RemoveTestService()
mnemonic := "limit reward expect search tissue call visa fit thank cream brave jump"
unlockPassword := "123"
svc, err := tests.CreateTestServiceWithMnemonic(mnemonic, unlockPassword)
require.NoError(t, err)
defer svc.Remove()

albyOAuthSvc := NewAlbyOAuthService(svc.DB, svc.Cfg, svc.Keys, svc.EventPublisher)
encryptedBackup, err := albyOAuthSvc.createEncryptedChannelBackup(&events.StaticChannelsBackupEvent{
Expand Down
124 changes: 124 additions & 0 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"flag"
"fmt"
"log/slog"
"os"

"gorm.io/gorm"

"github.com/getAlby/hub/db"
)

func main() {
var fromDSN, toDSN string

flag.StringVar(&fromDSN, "from", "", "source DSN")
flag.StringVar(&toDSN, "to", "", "destination DSN")

flag.Parse()

if fromDSN == "" || toDSN == "" {
flag.Usage()
slog.Error("missing DSN")
os.Exit(1)
}

stopDB := func(d *gorm.DB) {
if err := db.Stop(d); err != nil {
slog.Error("failed to close database", "error", err)
}
}

slog.Info("opening source DB...")
fromDB, err := db.NewDB(fromDSN, false)
if err != nil {
slog.Error("failed to open source database", "error", err)
os.Exit(1)
}
defer stopDB(fromDB)

slog.Info("opening destination DB...")
toDB, err := db.NewDB(toDSN, false)
if err != nil {
slog.Error("failed to open destination database", "error", err)
os.Exit(1)
}
defer stopDB(toDB)

slog.Info("migrating...")
err = migrateDB(fromDB, toDB)
if err != nil {
slog.Error("failed to migrate database", "error", err)
os.Exit(1)
}

slog.Info("migration complete")
}

func migrateDB(from, to *gorm.DB) error {
tx := to.Begin()
defer tx.Rollback()

if err := tx.Error; err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}

// Table migration order matters: referenced tables must be migrated
// before referencing tables.

slog.Info("migrating apps...")
if err := migrateTable[db.App](from, to); err != nil {
return fmt.Errorf("failed to migrate apps: %w", err)
}

slog.Info("migrating app_permissions...")
if err := migrateTable[db.AppPermission](from, to); err != nil {
return fmt.Errorf("failed to migrate app_permissions: %w", err)
}

slog.Info("migrating request_events...")
if err := migrateTable[db.RequestEvent](from, to); err != nil {
return fmt.Errorf("failed to migrate request_events: %w", err)
}

slog.Info("migrating response_events...")
if err := migrateTable[db.ResponseEvent](from, to); err != nil {
return fmt.Errorf("failed to migrate response_events: %w", err)
}

slog.Info("migrating transactions...")
if err := migrateTable[db.Transaction](from, to); err != nil {
return fmt.Errorf("failed to migrate transactions: %w", err)
}

slog.Info("migrating user_configs...")
if err := migrateTable[db.UserConfig](from, to); err != nil {
return fmt.Errorf("failed to migrate user_configs: %w", err)
}

tx.Commit()
if err := tx.Error; err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

func migrateTable[T any](from, to *gorm.DB) error {
var data []T
if err := from.Find(&data).Error; err != nil {
return fmt.Errorf("failed to fetch data: %w", err)
}

if len(data) == 0 {
return nil
}

if err := to.Create(data).Error; err != nil {
return fmt.Errorf("failed to insert data: %w", err)
}

return nil
}
103 changes: 91 additions & 12 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,81 @@ package db

import (
"fmt"
"strings"

"github.com/getAlby/hub/db/migrations"
"github.com/getAlby/hub/logger"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
gorm_logger "gorm.io/gorm/logger"

"github.com/getAlby/hub/db/migrations"
"github.com/getAlby/hub/logger"
)

var SerializeTransactions = false

type Config struct {
URI string
LogQueries bool
DriverName string
}

func NewDB(uri string, logDBQueries bool) (*gorm.DB, error) {
return NewDBWithConfig(&Config{
URI: uri,
LogQueries: logDBQueries,
DriverName: "",
})
}

config := &gorm.Config{
func NewDBWithConfig(cfg *Config) (*gorm.DB, error) {
gormConfig := &gorm.Config{
TranslateError: true,
}
if logDBQueries {
config.Logger = gorm_logger.Default.LogMode(gorm_logger.Info)
if cfg.LogQueries {
gormConfig.Logger = gorm_logger.Default.LogMode(gorm_logger.Info)
}

var ret *gorm.DB

if IsPostgresURI(cfg.URI) {
pgConfig := postgres.Config{
DriverName: cfg.DriverName,
DSN: cfg.URI,
}
var err error
ret, err = newPostgresDB(pgConfig, gormConfig)
if err != nil {
return nil, err
}
} else {
sqliteURI := cfg.URI
// avoid SQLITE_BUSY errors with _txlock=IMMEDIATE
if !strings.Contains(sqliteURI, "_txlock=") {
sqliteURI = sqliteURI + "?_txlock=IMMEDIATE"
}
sqliteConfig := sqlite.Config{
DriverName: cfg.DriverName,
DSN: sqliteURI,
}
var err error
ret, err = newSqliteDB(sqliteConfig, gormConfig)
if err != nil {
return nil, err
}
}

err := migrations.Migrate(ret)
if err != nil {
logger.Logger.WithError(err).Error("Failed to migrate")
return nil, err
}

// avoid SQLITE_BUSY errors with _txlock=IMMEDIATE
gormDB, err := gorm.Open(sqlite.Open(uri+"?_txlock=IMMEDIATE"), config)
return ret, nil
}

func newSqliteDB(sqliteConfig sqlite.Config, gormConfig *gorm.Config) (*gorm.DB, error) {
gormDB, err := gorm.Open(sqlite.New(sqliteConfig), gormConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -65,9 +121,12 @@ func NewDB(uri string, logDBQueries bool) (*gorm.DB, error) {
return nil, err
}

err = migrations.Migrate(gormDB)
return gormDB, nil
}

func newPostgresDB(pgConfig postgres.Config, gormConfig *gorm.Config) (*gorm.DB, error) {
gormDB, err := gorm.Open(postgres.New(pgConfig), gormConfig)
if err != nil {
logger.Logger.WithError(err).Error("Failed to migrate")
return nil, err
}

Expand All @@ -80,9 +139,13 @@ func Stop(db *gorm.DB) error {
return fmt.Errorf("failed to get database connection: %w", err)
}

err = db.Exec("PRAGMA wal_checkpoint(FULL)", nil).Error
if err != nil {
logger.Logger.WithError(err).Error("Failed to execute wal endpoint")
dbBackend := db.Dialector.Name()

if dbBackend == "sqlite" {
err = db.Exec("PRAGMA wal_checkpoint(FULL)", nil).Error
if err != nil {
logger.Logger.WithError(err).Error("Failed to execute wal endpoint")
}
}

err = sqlDB.Close()
Expand All @@ -91,3 +154,19 @@ func Stop(db *gorm.DB) error {
}
return nil
}

func IsPostgresURI(uri string) bool {
return strings.HasPrefix(uri, "postgresql://")
}

var txSerializer = make(chan struct{}, 1)

func RunTransaction(db *gorm.DB, txFunc func(tx *gorm.DB) error) error {
if SerializeTransactions {
txSerializer <- struct{}{}
defer func() {
<-txSerializer
}()
}
return db.Transaction(txFunc)
}
7 changes: 5 additions & 2 deletions db/migrations/202401191539_initial_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package migrations

import (
_ "embed"
"text/template"

"github.com/go-gormigrate/gormigrate/v2"
"gorm.io/gorm"
)

//go:embed 202401191539_initial_migration.sql
//go:embed 202401191539_initial_migration.sql.tmpl
var initialMigration string

var initialMigrationTmpl = template.Must(template.New("initial_migration").Parse(initialMigration))

// Initial migration
var _202401191539_initial_migration = &gormigrate.Migration{
ID: "202401191539_initial_migration",
Migrate: func(tx *gorm.DB) error {
return tx.Exec(initialMigration).Error
return exec(tx, initialMigrationTmpl)
},
Rollback: func(tx *gorm.DB) error {
return nil
Expand Down
16 changes: 0 additions & 16 deletions db/migrations/202401191539_initial_migration.sql

This file was deleted.

16 changes: 16 additions & 0 deletions db/migrations/202401191539_initial_migration.sql.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE TABLE "apps" ("id" integer,"name" text,"description" text,"nostr_pubkey" text UNIQUE,"created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }},PRIMARY KEY ("id"));
CREATE TABLE "app_permissions" ("id" integer,"app_id" integer,"request_method" text,"max_amount" integer,"budget_renewal" text,"expires_at" {{ .Timestamp }},"created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }},PRIMARY KEY ("id"),CONSTRAINT "fk_app_permissions_app" FOREIGN KEY ("app_id") REFERENCES "apps"("id") ON DELETE CASCADE);
CREATE INDEX "idx_app_permissions_request_method" ON "app_permissions"("request_method");
CREATE INDEX "idx_app_permissions_app_id" ON "app_permissions"("app_id");
CREATE TABLE "request_events" ("id" integer,"app_id" integer null,"nostr_id" text UNIQUE,"content" text,"state" text,"created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }},PRIMARY KEY ("id"),CONSTRAINT "fk_request_events_app" FOREIGN KEY ("app_id") REFERENCES "apps"("id") ON DELETE CASCADE);
CREATE UNIQUE INDEX "idx_request_events_nostr_id" ON "request_events"("nostr_id");
CREATE INDEX "idx_request_events_app_id" ON "request_events"("app_id");
CREATE TABLE "payments" ("id" integer,"app_id" integer,"request_event_id" integer,"amount" integer,"payment_request" text,"preimage" text,"created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }},PRIMARY KEY ("id"),CONSTRAINT "fk_payments_app" FOREIGN KEY ("app_id") REFERENCES "apps"("id") ON DELETE CASCADE,CONSTRAINT "fk_payments_request_event" FOREIGN KEY ("request_event_id") REFERENCES "request_events"("id"));
CREATE INDEX "idx_payments_request_event_id" ON "payments"("request_event_id");
CREATE INDEX "idx_payments_app_id" ON "payments"("app_id");
CREATE INDEX idx_payment_sum ON payments (app_id, preimage, created_at);
CREATE INDEX idx_request_events_app_id_and_id ON request_events(app_id, id);
CREATE TABLE "response_events" ("id" integer,"nostr_id" text UNIQUE,"request_id" integer,"content" text,"state" text,"replied_at" {{ .Timestamp }},"created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }},PRIMARY KEY ("id"),CONSTRAINT "fk_response_events_request_event" FOREIGN KEY ("request_id") REFERENCES "request_events"("id") ON DELETE CASCADE);
CREATE UNIQUE INDEX "idx_response_events_nostr_id" ON "response_events"("nostr_id");
CREATE TABLE "user_configs" ("id" integer, "key" text NOT NULL UNIQUE, "value" text, "encrypted" numeric, "created_at" {{ .Timestamp }},"updated_at" {{ .Timestamp }}, PRIMARY KEY("id"));
CREATE UNIQUE INDEX "idx_user_configs_key" ON "user_configs" ("key");
2 changes: 1 addition & 1 deletion db/migrations/202404021909_nullable_expires_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _202404021909_nullable_expires_at = &gormigrate.Migration{
ID: "202404021909_nullable_expires_at",
Migrate: func(tx *gorm.DB) error {

err := tx.Exec(`update app_permissions set expires_at = NULL where expires_at = "0001-01-01 00:00:00+00:00"`).Error
err := tx.Exec(`update app_permissions set expires_at = NULL where expires_at = '0001-01-01 00:00:00+00:00'`).Error

return err
},
Expand Down
2 changes: 1 addition & 1 deletion db/migrations/202405302121_store_decrypted_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _202405302121_store_decrypted_request = &gormigrate.Migration{
return err
}

err = tx.Exec("CREATE INDEX `idx_request_events_method` ON `request_events`(`method`);").Error
err = tx.Exec("CREATE INDEX \"idx_request_events_method\" ON \"request_events\"(\"method\");").Error
if err != nil {
return err
}
Expand Down
Loading
Loading