diff --git a/pkg/drivers/mysql/mysql.go b/pkg/drivers/mysql/mysql.go index f9745630..f1ce870b 100644 --- a/pkg/drivers/mysql/mysql.go +++ b/pkg/drivers/mysql/mysql.go @@ -48,6 +48,9 @@ var ( } schemaMigrations = []string{ `ALTER TABLE kine MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`, + // Creating an empty migration to ensure that postgresql and mysql migrations match up + // with each other for a give value of KINE_SCHEMA_MIGRATION env var + ``, } createDB = "CREATE DATABASE IF NOT EXISTS " ) @@ -148,6 +151,9 @@ func setup(db *sql.DB) error { if i >= int(schemaVersion) { break } + if stmt == "" { + continue + } logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt)) if _, err := db.Exec(stmt); err != nil { if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 { diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 7928d3b3..e339e846 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -3,6 +3,7 @@ package pgsql import ( "context" "database/sql" + "fmt" "net/url" "os" "regexp" @@ -32,7 +33,7 @@ var ( `CREATE TABLE IF NOT EXISTS kine ( id BIGSERIAL PRIMARY KEY, - name VARCHAR(630), + name text COLLATE "C", created INTEGER, deleted INTEGER, create_revision BIGINT, @@ -41,14 +42,19 @@ var ( value bytea, old_value bytea );`, + `CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`, `CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`, `CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`, `CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`, `CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`, + `CREATE INDEX IF NOT EXISTS kine_list_query_index on kine(name, id DESC, deleted)`, } schemaMigrations = []string{ `ALTER TABLE kine ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE kine_id_seq AS BIGINT`, + // It is important to set the collation to "C" to ensure that LIKE and COMPARISON + // queries use the index. + `ALTER TABLE kine ALTER COLUMN name SET DATA TYPE TEXT COLLATE "C" USING name::TEXT COLLATE "C"`, } createDB = "CREATE DATABASE " ) @@ -67,6 +73,41 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo if err != nil { return nil, err } + listSQL := ` + SELECT + (SELECT MAX(rkv.id) AS id FROM kine AS rkv), + (SELECT MAX(crkv.prev_revision) AS prev_revision FROM kine AS crkv WHERE crkv.name = 'compact_rev_key'), + maxkv.* + FROM ( + SELECT DISTINCT ON (name) + kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value + FROM + kine AS kv + WHERE + kv.name LIKE ? + %s + ORDER BY kv.name, theid DESC + ) AS maxkv + WHERE + maxkv.deleted = 0 OR ? + ORDER BY maxkv.name, maxkv.theid DESC + ` + + countSQL := ` + SELECT + (SELECT MAX(rkv.id) AS id FROM kine AS rkv), + COUNT(c.theid) + FROM ( + SELECT DISTINCT ON (name) + kv.id AS theid, kv.deleted + FROM kine AS kv + WHERE + kv.name LIKE ? + %s + ORDER BY kv.name, theid DESC + ) AS c + WHERE c.deleted = 0 OR ? + ` dialect.GetSizeSQL = `SELECT pg_total_relation_size('kine')` dialect.CompactSQL = ` DELETE FROM kine AS kv @@ -85,6 +126,11 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo kd.id <= $2 ) AS ks WHERE kv.id = ks.id` + dialect.GetCurrentSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ?")) + dialect.ListRevisionStartSQL = q(fmt.Sprintf(listSQL, "AND kv.id <= ?")) + dialect.GetRevisionAfterSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ? AND kv.id <= ?")) + dialect.CountCurrentSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ?")) + dialect.CountRevisionSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ? AND kv.id <= ?")) dialect.FillRetryDuration = time.Millisecond + 5 dialect.InsertRetry = func(err error) bool { if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == "kine_pkey" { @@ -118,9 +164,20 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo func setup(db *sql.DB) error { logrus.Infof("Configuring database table schema and indexes, this may take a moment...") + var version string + collationSupported := true + if err := db.QueryRow("select version()").Scan(&version); err == nil && strings.Contains(strings.ToLower(version), "cockroachdb") { + // CockroadDB does not seem to support "C" as a collation + // It looks like it's using golang.org/x/text/language and ends up calling something like v, err := language.Parse("C") + // which parses it as a BCP47 language tag instead of a collation. + collationSupported = false + } for _, stmt := range schema { logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) + if !collationSupported { + stmt = strings.ReplaceAll(stmt, ` COLLATE "C"`, "") + } if _, err := db.Exec(stmt); err != nil { return err } @@ -134,6 +191,12 @@ func setup(db *sql.DB) error { if i >= int(schemaVersion) { break } + if !collationSupported { + stmt = strings.ReplaceAll(stmt, ` COLLATE "C"`, "") + } + if stmt == "" { + continue + } logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt)) if _, err := db.Exec(stmt); err != nil { return err