Skip to content

Commit

Permalink
Support reverse backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
dz0ny committed Oct 3, 2023
1 parent bcdafe1 commit 5655672
Show file tree
Hide file tree
Showing 14 changed files with 403 additions and 241 deletions.
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ is-postgres-running:
@(pg_isready -h localhost) || (echo "# ==> Startis postgres by running 'make up'" && exit 2)

.PHONY: pgweb
pgweb:is-postgres-running
@pgweb --url "postgres://test_target@localhost:5432/test_target?sslmode=disable"
pgweb: is-postgres-running
if [ -n "$(filter-out $@,$(MAKECMDGOALS))" ]; then \
pgweb --url "$(filter-out $@,$(MAKECMDGOALS))"; \
else \
pgweb --url "postgres://test_target@localhost:5432/test_target?sslmode=disable"; \
fi;

build:
rm -rf dist
Expand All @@ -24,7 +28,7 @@ lint:
golangci-lint run

dump:
pg_dump --no-acl --schema-only -n public -x -O -f ./dump.sql $(filter-out $@,$(MAKECMDGOALS))
pg_dump --no-acl --schema-only -n public -x -O -c -f ./dump.sql $(filter-out $@,$(MAKECMDGOALS))

restore:
psql -f ./dump.sql "postgres://test_target@localhost:5432/test_target?sslmode=disable"
Expand Down
2 changes: 1 addition & 1 deletion cli/extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (i *arrayExtra) Set(value string) error {

func maybeAll(s string) string {
if s == "all" {
return "1=1"
return subsetter.RuleAll
}
return s
}
1 change: 1 addition & 0 deletions devenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
eclint # EditorConfig linter and fixer
gnumake # GNU Make
goreleaser # Go binary release tool
pgweb # PostgreSQL web interface
];

languages.javascript.enable = true;
Expand Down
141 changes: 141 additions & 0 deletions subsetter/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package subsetter

import (
"fmt"
"strings"

"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
)

// copyTableData copies the data from a table in the source database to the destination database
func copyTableData(table Table, relatedQueries []string, withLimit bool, source *pgxpool.Pool, destination *pgxpool.Pool) (err error) {
// Backtrace the inserted ids from main table to related table
subSelectQuery := ""
if len(relatedQueries) > 0 {
subSelectQuery = "WHERE " + strings.Join(relatedQueries, " AND ")
}

limit := ""
if withLimit {
limit = fmt.Sprintf("LIMIT %d", table.Rows)
}

var data string
if data, err = CopyTableToString(table.Name, limit, subSelectQuery, source); err != nil {
//log.Error().Err(err).Str("table", table.Name).Msg("Error getting table data")
return
}
if err = CopyStringToTable(table.Name, data, destination); err != nil {
//log.Error().Err(err).Str("table", table.Name).Msg("Error pushing table data")
return
}
return

}

func relatedQueriesBuilder(
depth *int,
tables []Table,
relation Relation,
table Table,
source *pgxpool.Pool,
destination *pgxpool.Pool,
visitedTables *[]string,
relatedQueries *[]string,
) (err error) {

retry:
q := fmt.Sprintf(`SELECT %s FROM %s`, relation.ForeignColumn, relation.ForeignTable)
log.Debug().Str("query", q).Msgf("Getting keys for %s from target", table.Name)

if primaryKeys, err := GetKeys(q, destination); err != nil {
log.Error().Err(err).Msgf("Error getting keys for %s", table.Name)
return err
} else {
if len(primaryKeys) == 0 {

missingTable := TableByName(tables, relation.ForeignTable)
if err = relationalCopy(depth, tables, missingTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", missingTable.Name)
}

// Retry short circuit
*depth++

log.Debug().Int("depth", *depth).Msgf("Retrying keys for %s", relation.ForeignTable)
if *depth < 1 {
goto retry
} else {
log.Debug().Str("table", relation.ForeignTable).Str("primary", relation.PrimaryTable).Msgf("No keys found at this time")
return errors.New("Max depth reached")
}

} else {
*depth = 0
keys := lo.Map(primaryKeys, func(key string, _ int) string {
return QuoteString(key)
})
rq := fmt.Sprintf(`%s IN (%s)`, relation.PrimaryColumn, strings.Join(keys, ","))
*relatedQueries = append(*relatedQueries, rq)
}
}
return nil
}

func relationalCopy(
depth *int,
tables []Table,
table Table,
visitedTables *[]string,
source *pgxpool.Pool,
destination *pgxpool.Pool,
) error {
log.Debug().Str("table", table.Name).Msg("Preparing")

relatedTables, err := TableGraph(table.Name, table.Relations)
if err != nil {
return errors.Wrapf(err, "Error sorting tables from graph")
}
log.Debug().Strs("tables", relatedTables).Msgf("Order of copy")

for _, tableName := range relatedTables {

if lo.Contains(*visitedTables, tableName) {
continue
}

relatedTable := TableByName(tables, tableName)
*visitedTables = append(*visitedTables, relatedTable.Name)
// Use realized query to get primary keys that are already in the destination for all related tables

// Selection query for this table
relatedQueries := []string{}

for _, relation := range relatedTable.Relations {
err := relatedQueriesBuilder(depth, tables, relation, relatedTable, source, destination, visitedTables, &relatedQueries)
if err != nil {
return err
}
}

if len(relatedQueries) > 0 {
log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Transferring with relationalCopy")
}

if err = copyTableData(relatedTable, relatedQueries, false, source, destination); err != nil {
if condition, ok := err.(*pgconn.PgError); ok && condition.Code == "23503" { // foreign key violation
if err := relationalCopy(depth, tables, relatedTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}
}
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}

}

return nil
}
19 changes: 19 additions & 0 deletions subsetter/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,22 @@ func TableGraph(primary string, relations []Relation) (l []string, err error) {
slices.Reverse(l)
return
}

func RequiredTableGraph(primary string, relations []Relation) (l []string, err error) {
graph := topsort.NewGraph() // Create a new graph

for _, r := range relations {
if !r.IsSelfRelated() {
err = graph.AddEdge(r.ForeignTable, r.PrimaryTable)
if err != nil {
return
}
}
}
l, err = graph.TopSort(primary)
if err != nil {
return
}
slices.Reverse(l)
return
}
17 changes: 6 additions & 11 deletions subsetter/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ import (
"fmt"
"math"
"strconv"

"github.com/samber/lo"
)

// GetTargetSet returns a subset of tables with the number of rows scaled by the fraction.
func GetTargetSet(fraction float64, tables []Table) []Table {
var subset []Table

for _, table := range tables {
subset = append(subset, Table{
Name: table.Name,
Rows: int(math.Pow(10, math.Log10(float64(table.Rows))*fraction)),
Relations: table.Relations,
})
}

return subset
return lo.Map(tables, func(table Table, i int) Table {
table.Rows = int(math.Pow(10, math.Log10(float64(table.Rows))*fraction))
return table
})
}

func QuoteString(s string) string {
Expand Down
8 changes: 6 additions & 2 deletions subsetter/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ func TestGetTargetSet(t *testing.T) {
tables []Table
want []Table
}{
{"simple", 0.5, []Table{{"simple", 1000, []Relation{}}}, []Table{{"simple", 31, []Relation{}}}},
{"simple", 0.5, []Table{{"simple", 10, []Relation{}}}, []Table{{"simple", 3, []Relation{}}}},
{"simple", 0.5,
[]Table{{"simple", 1000, []Relation{}, []Relation{}}},
[]Table{{"simple", 31, []Relation{}, []Relation{}}}},
{"simple", 0.5,
[]Table{{"simple", 10, []Relation{}, []Relation{}}},
[]Table{{"simple", 3, []Relation{}, []Relation{}}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
32 changes: 19 additions & 13 deletions subsetter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

type Table struct {
Name string
Rows int
Relations []Relation
Name string
Rows int
Relations []Relation
RequiredBy []Relation
}

// RelationNames returns a list of relation names in human readable format.
Expand All @@ -38,13 +39,12 @@ func (t *Table) IsSelfRelated() bool {

// IsSelfRelated returns true if a table is self related.
func TableByName(tables []Table, name string) Table {
return lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == name
})[0]
return lo.FindOrElse(tables, Table{}, func(t Table) bool {
return t.Name == name
})
}

// GetTablesWithRows returns a list of tables with the number of rows in each table.
// Warning reltuples used to dermine size is an estimate of the number of rows in the table and can be zero for small tables.
func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
q := `SELECT
relname,
Expand Down Expand Up @@ -79,10 +79,10 @@ func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
}

// Get relations
table.Relations, err = GetRelations(table.Name, conn)
if err != nil {
return nil, err
}
table.Relations = GetRelations(table.Name, conn)

// Get reverse relations
table.RequiredBy = GetRequiredBy(table.Name, conn)

tables = append(tables, table)
}
Expand Down Expand Up @@ -153,13 +153,19 @@ func CopyQueryToString(query string, conn *pgxpool.Pool) (result string, err err

// CopyTableToString copies a table to a string.
func CopyTableToString(table string, limit string, where string, conn *pgxpool.Pool) (result string, err error) {
q := fmt.Sprintf(`SELECT * FROM %s %s order by random() %s`, table, where, limit)
log.Debug().Msgf("Query: %s", q)
maybeOrder := ""
if lo.IsNotEmpty(where) {
maybeOrder = "order by random()"
}

q := fmt.Sprintf(`SELECT * FROM %s %s %s %s`, table, where, maybeOrder, limit)
log.Debug().Msgf("CopyTableToString query: %s", q)
return CopyQueryToString(q, conn)
}

// CopyStringToTable copies a string to a table.
func CopyStringToTable(table string, data string, conn *pgxpool.Pool) (err error) {
log.Debug().Msgf("CopyStringToTable query: %s", table)
q := fmt.Sprintf(`copy %s from stdin`, table)
var buff bytes.Buffer
buff.WriteString(data)
Expand Down
6 changes: 5 additions & 1 deletion subsetter/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ func TestGetTablesWithRows(t *testing.T) {
wantTables []Table
wantErr bool
}{
{"With tables", conn, []Table{{"simple", 0, []Relation{}}, {"relation", 0, []Relation{}}}, false},
{"With tables", conn,
[]Table{
{"simple", 0, []Relation{}, []Relation{}},
{"relation", 0, []Relation{}, []Relation{}},
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 5655672

Please sign in to comment.