Skip to content

Commit

Permalink
BED-4194: PG Migrator Testing (#582)
Browse files Browse the repository at this point in the history
* added integration tests for db switch endpoints

* testing for 3 api handlers

* added cancellation test

* updated tests, minor refactor of migrator

* update variable names

* refactor kinds check

* added integration test header

* bump ci

* added step to drop pg graph schema before migrating, added documentation

* fixed docs typo

* fixed docs formatting

* updated docs

* bump ci

* switched pg testing db to correct version
  • Loading branch information
maffkipp authored Jan 10, 2025
1 parent 4b671da commit 9e77c99
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 39 deletions.
20 changes: 20 additions & 0 deletions cmd/api/src/api/tools/PG_MIGRATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Migrating Graph Data from Neo4j to Postgres

### Endpoints
| Endpoint | HTTP Request | Usage | Expected Response |
| --- | --- | --- | --- |
| `/pg-migration/status/` | `GET` | Returns a status indicating whether the migrator is currently running. | **Status:** `200 OK`</br></br><pre>{</br>&nbsp;&nbsp;"state": "idle" \| "migrating" \| "canceling"</br>}</pre> |
| `/pg-migration/start/` | `PUT` | Kicks off the migration process from neo4j to postgres. | **Status:** `202 Accepted` |
| `/pg-migration/cancel/` | `PUT` | Cancels the currently running migration. | **Status:** `202 Accepted` |
| `/graph-db/switch/pg/` | `PUT` | Switches the current graph database driver to postgres. | **Status:** `200 OK` |
| `/graph-db/switch/ne04j/` | `PUT` | Switches the current graph database driver to ne04j. | **Status:** `200 OK` |

### Running a Migration
1. Confirm the migration status is currently "idle" before running a migration with the `/pg-migration/status/` endpoint. The migration will run in the same direction regardless of the currently selected graph driver.
2. Start the migration process using the `/pg-migration/start/` endpoint. Since the migration occurs asynchronously, you will want to monitor the API logs to see information regarding the currently running migration.
- When the migration starts, there should be a log with the message `"Dispatching live migration from Neo4j to PostgreSQL"`
- Upon completion, you should see the message `"Migration to PostgreSQL completed successfully"`
- Any errors that occur during the migration process will also surface here
- You can also poll the `/pg-migration/status/` endpoint and wait for an `"idle"` status to indicate the migration has completed
- An in-progess migration can be cancelled with the `pg-migration/cancel/` endpoint and run again at any time
3. Once you are ready to switch over to the postgres graph driver, you can use the `/graph-db/switch/pg/` endpoint.
80 changes: 41 additions & 39 deletions cmd/api/src/api/tools/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
type MigratorState string

const (
stateIdle MigratorState = "idle"
stateMigrating MigratorState = "migrating"
stateCanceling MigratorState = "canceling"
StateIdle MigratorState = "idle"
StateMigrating MigratorState = "migrating"
StateCanceling MigratorState = "canceling"
)

func migrateTypes(ctx context.Context, neoDB, pgDB graph.Database) error {
Expand Down Expand Up @@ -187,21 +187,21 @@ func migrateEdges(ctx context.Context, neoDB, pgDB graph.Database, nodeIDMapping
type PGMigrator struct {
graphSchema graph.Schema
graphDBSwitch *graph.DatabaseSwitch
serverCtx context.Context
ServerCtx context.Context
migrationCancelFunc func()
state MigratorState
State MigratorState
lock *sync.Mutex
cfg config.Configuration
Cfg config.Configuration
}

func NewPGMigrator(serverCtx context.Context, cfg config.Configuration, graphSchema graph.Schema, graphDBSwitch *graph.DatabaseSwitch) *PGMigrator {
return &PGMigrator{
graphSchema: graphSchema,
graphDBSwitch: graphDBSwitch,
serverCtx: serverCtx,
state: stateIdle,
ServerCtx: serverCtx,
State: StateIdle,
lock: &sync.Mutex{},
cfg: cfg,
Cfg: cfg,
}
}

Expand All @@ -212,31 +212,28 @@ func (s *PGMigrator) advanceState(next MigratorState, validTransitions ...Migrat
isValid := false

for _, validTransition := range validTransitions {
if s.state == validTransition {
if s.State == validTransition {
isValid = true
break
}
}

if !isValid {
return fmt.Errorf("migrator state is %s but expected one of: %v", s.state, validTransitions)
return fmt.Errorf("migrator state is %s but expected one of: %v", s.State, validTransitions)
}

s.state = next
s.State = next
return nil
}

func (s *PGMigrator) SwitchPostgreSQL(response http.ResponseWriter, request *http.Request) {
if pgDB, err := dawgs.Open(s.serverCtx, pg.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.cfg.Database.PostgreSQLConnectionString(),
}); err != nil {
if pgDB, err := s.OpenPostgresGraphConnection(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": fmt.Errorf("failed connecting to PostgreSQL: %w", err),
}, http.StatusInternalServerError, response)
} else if err := pgDB.AssertSchema(request.Context(), s.graphSchema); err != nil {
log.Errorf("Unable to assert graph schema in PostgreSQL: %v", err)
} else if err := SetGraphDriver(request.Context(), s.cfg, pg.DriverName); err != nil {
} else if err := SetGraphDriver(request.Context(), s.Cfg, pg.DriverName); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": fmt.Errorf("failed updating graph database driver preferences: %w", err),
}, http.StatusInternalServerError, response)
Expand All @@ -249,14 +246,11 @@ func (s *PGMigrator) SwitchPostgreSQL(response http.ResponseWriter, request *htt
}

func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Request) {
if neo4jDB, err := dawgs.Open(s.serverCtx, neo4j.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.cfg.Neo4J.Neo4jConnectionString(),
}); err != nil {
if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": fmt.Errorf("failed connecting to Neo4j: %w", err),
}, http.StatusInternalServerError, response)
} else if err := SetGraphDriver(request.Context(), s.cfg, neo4j.DriverName); err != nil {
} else if err := SetGraphDriver(request.Context(), s.Cfg, neo4j.DriverName); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": fmt.Errorf("failed updating graph database driver preferences: %w", err),
}, http.StatusInternalServerError, response)
Expand All @@ -268,23 +262,17 @@ func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Req
}
}

func (s *PGMigrator) startMigration() error {
if err := s.advanceState(stateMigrating, stateIdle); err != nil {
func (s *PGMigrator) StartMigration() error {
if err := s.advanceState(StateMigrating, StateIdle); err != nil {
return fmt.Errorf("database migration state error: %w", err)
} else if neo4jDB, err := dawgs.Open(s.serverCtx, neo4j.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.cfg.Neo4J.Neo4jConnectionString(),
}); err != nil {
} else if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil {
return fmt.Errorf("failed connecting to Neo4j: %w", err)
} else if pgDB, err := dawgs.Open(s.serverCtx, pg.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.cfg.Database.PostgreSQLConnectionString(),
}); err != nil {
} else if pgDB, err := s.OpenPostgresGraphConnection(); err != nil {
return fmt.Errorf("failed connecting to PostgreSQL: %w", err)
} else {
log.Infof("Dispatching live migration from Neo4j to PostgreSQL")

migrationCtx, migrationCancelFunc := context.WithCancel(s.serverCtx)
migrationCtx, migrationCancelFunc := context.WithCancel(s.ServerCtx)
s.migrationCancelFunc = migrationCancelFunc

go func(ctx context.Context) {
Expand All @@ -304,7 +292,7 @@ func (s *PGMigrator) startMigration() error {
log.Infof("Migration to PostgreSQL completed successfully")
}

if err := s.advanceState(stateIdle, stateMigrating, stateCanceling); err != nil {
if err := s.advanceState(StateIdle, StateMigrating, StateCanceling); err != nil {
log.Errorf("Database migration state management error: %v", err)
}
}(migrationCtx)
Expand All @@ -314,7 +302,7 @@ func (s *PGMigrator) startMigration() error {
}

func (s *PGMigrator) MigrationStart(response http.ResponseWriter, request *http.Request) {
if err := s.startMigration(); err != nil {
if err := s.StartMigration(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": err.Error(),
}, http.StatusInternalServerError, response)
Expand All @@ -323,8 +311,8 @@ func (s *PGMigrator) MigrationStart(response http.ResponseWriter, request *http.
}
}

func (s *PGMigrator) cancelMigration() error {
if err := s.advanceState(stateCanceling, stateMigrating); err != nil {
func (s *PGMigrator) CancelMigration() error {
if err := s.advanceState(StateCanceling, StateMigrating); err != nil {
return err
}

Expand All @@ -334,7 +322,7 @@ func (s *PGMigrator) cancelMigration() error {
}

func (s *PGMigrator) MigrationCancel(response http.ResponseWriter, request *http.Request) {
if err := s.cancelMigration(); err != nil {
if err := s.CancelMigration(); err != nil {
api.WriteJSONResponse(request.Context(), map[string]any{
"error": err.Error(),
}, http.StatusInternalServerError, response)
Expand All @@ -345,6 +333,20 @@ func (s *PGMigrator) MigrationCancel(response http.ResponseWriter, request *http

func (s *PGMigrator) MigrationStatus(response http.ResponseWriter, request *http.Request) {
api.WriteJSONResponse(request.Context(), map[string]any{
"state": s.state,
"state": s.State,
}, http.StatusOK, response)
}

func (s *PGMigrator) OpenPostgresGraphConnection() (graph.Database, error) {
return dawgs.Open(s.ServerCtx, pg.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.Cfg.Database.PostgreSQLConnectionString(),
})
}

func (s *PGMigrator) OpenNeo4jGraphConnection() (graph.Database, error) {
return dawgs.Open(s.ServerCtx, neo4j.DriverName, dawgs.Config{
GraphQueryMemoryLimit: size.Gibibyte,
DriverCfg: s.Cfg.Neo4J.Neo4jConnectionString(),
})
}
Loading

0 comments on commit 9e77c99

Please sign in to comment.