Skip to content

Commit

Permalink
Support read only source database
Browse files Browse the repository at this point in the history
Read only database does not expose detailed schema so we have to build it from ground up.
  • Loading branch information
dz0ny committed Aug 14, 2023
1 parent fd52cfa commit 8a7b13f
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 99 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.pgsql/data
bin
dist/
*.sql
.nix-profile*
9 changes: 7 additions & 2 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
before:
hooks:
- go mod tidy
universal_binaries:
- replace: true

builds:
- env:
- CGO_ENABLED=0
goos:
- linux
- windows
- darwin
main: ./cli

goarch:
- amd64
- arm64
archives:
- format: tar.gz
strip_parent_binary_folder: true
# this name template makes the OS and Arch compatible with the results of uname.
name_template: >-
{{ .ProjectName }}_
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ is-postgres-running:

.PHONY: pgweb
pgweb:is-postgres-running
@pgweb --url "postgres://test_source@localhost:5432/test_source?sslmode=disable"
@pgweb --url "postgres://test_target@localhost:5432/test_target?sslmode=disable"

build:
rm -rf dist
Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[![lint](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml) [![build](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml) [![vuln](https://github.com/teamniteo/pg-subsetter/actions/workflows/vuln.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/vuln.yml)


`pg-subsetter` is a tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA. This means that your target database has to have schema populated in some other way.
`pg-subsetter` is a tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA.


### Database Fraction Synchronization
`pg-subsetter` allows you to select and sync a specific subset of your database. Whether it's a fraction of a table or a particular dataset, you can have it replicated in another database without synchronizing the entire DB.
Expand All @@ -17,6 +18,8 @@ Utilizing the native PostgreSQL COPY command, `pg-subsetter` performs data trans
### Stateless Operation
`pg-subsetter` is built to be stateless, meaning it does not maintain any internal state between runs. This ensures that each synchronization process is independent, enhancing reliability and making it easier to manage and scale.

### Sync required rows
`pg-subsetter` can be isntructed to copy certain rows in specific tables, the command can be used multipel times to sync more data.

## Usage

Expand All @@ -27,21 +30,33 @@ Usage of subsetter:
-f float
Fraction of rows to copy (default 0.05)
-force value
Query to copy required tables (users: id = 1)
Query to copy required tables 'table: whois query', can be used multiple times
-src string
Source database DSN
```


### Example

Copy a fraction of the database and force certain rows to be also copied over.

Prepare schema in target databse:

```bash
pg_dump --schema-only -n public -f schemadump.sql "postgres://test_source@localhost:5432/test_source?sslmode=disable"
psql -f schemadump.sql "postgres://test_target@localhost:5432/test_target?sslmode=disable"
```

Copy a fraction of the database and force certain rows to be also copied over:

```
pg-subsetter \
-src "postgres://test_source@localhost:5432/test_source?sslmode=disable" \
-dst "postgres://test_target@localhost:5432/test_target?sslmode=disable" \
-f 0.05
-f 0.5
-force "user: id=1"
-force "group: id=1"
-force "domains: domain_name ilike '%.si'"
```

# Installing
Expand Down
8 changes: 4 additions & 4 deletions cli/force.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
"niteo.co/subsetter/subsetter"
)

type arrayForce []subsetter.Force
type arrayExtra []subsetter.Rule

func (i *arrayForce) String() string {
func (i *arrayExtra) String() string {
return fmt.Sprintf("%v", *i)
}

func (i *arrayForce) Set(value string) error {
func (i *arrayExtra) Set(value string) error {
q := strings.SplitAfter(strings.TrimSpace(value), ":")

*i = append(*i, subsetter.Force{
*i = append(*i, subsetter.Rule{
Table: q[0],
Where: q[1],
})
Expand Down
26 changes: 19 additions & 7 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,31 @@ import (
"niteo.co/subsetter/subsetter"
)

var (
version = "dev"
commit = "none"
date = "unknown"
)

var src = flag.String("src", "", "Source database DSN")
var dst = flag.String("dst", "", "Destination database DSN")
var fraction = flag.Float64("f", 0.05, "Fraction of rows to copy")
var verbose = flag.Bool("verbose", true, "Show more information during sync")
var forceSync arrayForce
var ver = flag.Bool("v", false, "Release information")
var extraInclude arrayExtra

func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack

flag.Var(&forceSync, "force", "Query to copy required tables (users: id = 1)")
flag.Var(&extraInclude, "force", "Query to copy required tables 'users: id = 1', can be used multiple times")
flag.Parse()

if *ver {
log.Info().Str("version", version).Str("commit", commit).Str("date", date).Msg("Version")
os.Exit(0)
}

if *src == "" || *dst == "" {
log.Fatal().Msg("Source and destination DSNs are required")
}
Expand All @@ -31,20 +43,20 @@ func main() {
log.Fatal().Msg("Fraction must be between 0 and 1")
}

if len(forceSync) > 0 {
log.Info().Str("forced", forceSync.String()).Msg("Forcing sync for tables")
if len(extraInclude) > 0 {
log.Info().Str("include", extraInclude.String()).Msg("Forcing sync for tables")
}

s, err := subsetter.NewSync(*src, *dst, *fraction, forceSync, *verbose)
s, err := subsetter.NewSync(*src, *dst, *fraction, extraInclude, *verbose)
if err != nil {
log.Fatal().Stack().Err(err).Msg("Failed to configure sync")
log.Fatal().Err(err).Msg("Failed to configure sync")
}

defer s.Close()

err = s.Sync()
if err != nil {
log.Fatal().Stack().Err(err).Msg("Failed to sync")
log.Fatal().Err(err).Msg("Failed to sync")
}

}
8 changes: 4 additions & 4 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
allowed-users = [ "@wheel" "@staff" ]; # allow compiling on every device/machine
};
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/release-23.05";
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
};
outputs = inputs@{ self, nixpkgs, flake-parts, ... }:
Expand Down Expand Up @@ -35,7 +35,7 @@
go
goreleaser
golangci-lint
postgresql
postgresql_15
process-compose
nixpkgs-fmt
pgweb
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ module niteo.co/subsetter

go 1.20

require github.com/rs/zerolog v1.30.0
require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.30.0
)

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sync v0.1.0 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1
github.com/jackc/pgx/v5 v5.4.3
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
Expand Down
40 changes: 40 additions & 0 deletions subsetter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"fmt"
"strings"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/samber/lo"
)

type Table struct {
Expand All @@ -14,6 +16,16 @@ type Table struct {
Relations []Relation
}

func (t *Table) RelationNames() (names string) {
rel := lo.Map(t.Relations, func(r Relation, _ int) string {
return r.PrimaryTable + ">" + r.PrimaryColumn
})
if len(rel) > 0 {
return strings.Join(rel, ", ")
}
return "none"
}

func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
q := `SELECT
relname,
Expand Down Expand Up @@ -46,17 +58,34 @@ func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
return
}

func GetKeys(q string, conn *pgxpool.Pool) (ids []string, err error) {
rows, err := conn.Query(context.Background(), q)
for rows.Next() {
var id string

if err := rows.Scan(&id); err == nil {
ids = append(ids, id)
}

}
rows.Close()

return
}

func CopyQueryToString(query string, conn *pgxpool.Pool) (result string, err error) {
q := fmt.Sprintf(`copy (%s) to stdout`, query)
var buff bytes.Buffer
c, err := conn.Acquire(context.Background())
if err != nil {
return
}
defer c.Release()
if _, err = c.Conn().PgConn().CopyTo(context.Background(), &buff, q); err != nil {
return
}
result = buff.String()

return
}

Expand All @@ -73,9 +102,20 @@ func CopyStringToTable(table string, data string, conn *pgxpool.Pool) (err error
if err != nil {
return
}
defer c.Release()

if _, err = c.Conn().PgConn().CopyFrom(context.Background(), &buff, q); err != nil {
return
}

return
}

func CountRows(s string, conn *pgxpool.Pool) (count int, err error) {
q := "SELECT count(*) FROM " + s
err = conn.QueryRow(context.Background(), q).Scan(&count)
if err != nil {
return
}
return
}
13 changes: 1 addition & 12 deletions subsetter/query_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package subsetter

import (
"context"
"strings"
"testing"

Expand Down Expand Up @@ -89,21 +88,11 @@ func TestCopyStringToTable(t *testing.T) {
t.Errorf("CopyStringToTable() error = %v, wantErr %v", err, tt.wantErr)
return
}
gotInserted := insertedRows(tt.table, tt.conn)
gotInserted, _ := CountRows(tt.table, tt.conn)
if tt.wantResult != gotInserted {
t.Errorf("CopyStringToTable() = %v, want %v", tt.wantResult, tt.wantResult)
}

})
}
}

func insertedRows(s string, conn *pgxpool.Pool) int {
q := "SELECT count(*) FROM " + s
var count int
err := conn.QueryRow(context.Background(), q).Scan(&count)
if err != nil {
panic(err)
}
return count
}
Loading

0 comments on commit 8a7b13f

Please sign in to comment.