Skip to content

Commit

Permalink
Support check constraint backend (GoogleCloudPlatform#962)
Browse files Browse the repository at this point in the history
* Check constraint backend (#9)

Backend Support for Check Constraint

* update api

* fix PR comment

* remove api call to while validating constraints

* Fixed db collation regex to remove collation name from the results

* renamed function name to formatCheckConstraints and added check if constraint name is empty

* fixed PR comments

* added test case for the empty check constraint name

* fix: added regular exprression to match the exact column

* fix: added regular expression to replace table name

* Added test case for the column rename for check constraint

* 1. Refactored GetConstraint function
2. Fixed inforschema unit tests

* added comment at handling case for check constraints

* reverted white spaces

* reverted white spaces

* nit: doesCheckConstraintNameExist

* added comments for doesCheckConstraintNameExist

* PR and UT fixes

* fix UT

* UT fix

* Removed isCheckConstraintsTablePresent function

* moved regex globally

* Fix UT

* fixed UT

* fixed handling of the constraints

* removed unused function

* added unit tests for incompatable name

* Combined unit tests

* added test case for the renaming column having substring of other column

* added the query changes which return distinct value

---------

Co-authored-by: taherkl <taher.lakdawala@ollion.com>
Co-authored-by: Akash Thawait <aakash@ollion.com>
Co-authored-by: Vivek Yadav <vivek.yadav@ollion.com>
  • Loading branch information
4 people authored Dec 18, 2024
1 parent b60da05 commit 38ee14e
Show file tree
Hide file tree
Showing 26 changed files with 1,002 additions and 330 deletions.
22 changes: 12 additions & 10 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ const (
AddIndex = "add_index"
EditColumnMaxLength = "edit_column_max_length"
AddShardIdPrimaryKey = "add_shard_id_primary_key"
//bulk migration type
// bulk migration type
BULK_MIGRATION = "bulk"
//dataflow migration type
// dataflow migration type
DATAFLOW_MIGRATION = "dataflow"
//DMS migration type
// DMS migration type
DMS_MIGRATION = "dms"

SESSION_FILE = "sessionFile"

//Default shardId
// Default shardId
DEFAULT_SHARD_ID string = "smt-default"
//Metadata database name
// Metadata database name
METADATA_DB string = "spannermigrationtool_metadata"
//Migration types
// Migration types
MINIMAL_DOWNTIME_MIGRATION = "minimal_downtime"
//Job Resource Types
// Job Resource Types
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
DLQ_PUBSUB_RESOURCE string = "dlq_pubsub"
Expand All @@ -111,7 +111,7 @@ const (
// Default gcs path of the Dataflow template.
DEFAULT_TEMPLATE_PATH string = "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner"

//FK Actions
// FK Actions
FK_NO_ACTION string = "NO ACTION"
FK_CASCADE string = "CASCADE"
FK_SET_DEFAULT string = "SET DEFAULT"
Expand All @@ -122,8 +122,10 @@ const (
REGULAR_GCS string = "data"
DLQ_GCS string = "dlq"

//VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
// VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
DEFAUT_EXPRESSION = "DEFAULT"

// Regex for matching database collation
DB_COLLATION_REGEX = `(_[a-zA-Z0-9]+\\|\\)`
)
1 change: 1 addition & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const (
SequenceCreated
ForeignKeyActionNotSupported
NumericPKNotSupported
TypeMismatch
)

const (
Expand Down
7 changes: 6 additions & 1 deletion internal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type Counter struct {
counterMutex sync.Mutex
ObjectId string
ObjectId string
}

var Cntr Counter
Expand Down Expand Up @@ -65,6 +65,11 @@ func GenerateForeignkeyId() string {
func GenerateIndexesId() string {
return GenerateId("i")
}

func GenerateCheckConstrainstId() string {
return GenerateId("cc")
}

func GenerateRuleId() string {
return GenerateId("r")
}
Expand Down
9 changes: 9 additions & 0 deletions internal/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func ToSpannerIndexName(conv *Conv, srcIndexName string) string {
return getSpannerValidName(conv, srcIndexName)
}

// Note that the check constraints names in spanner have to be globally unique
// (across the database). But in some source databases, such as MySQL,
// they only have to be unique for a table. Hence we must map each source
// constraint name to a unique spanner constraint name.
func ToSpannerCheckConstraintName(conv *Conv, srcCheckConstraintName string) string {
return getSpannerValidName(conv, srcCheckConstraintName)
}


// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
Expand Down
7 changes: 7 additions & 0 deletions internal/reports/report_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ func buildTableReportBody(conv *internal.Conv, tableId string, issues map[string
Description: fmt.Sprintf("UNIQUE constraint on column(s) '%s' replaced with primary key since table '%s' didn't have one. Spanner requires a primary key for every table", strings.Join(uniquePK, ", "), conv.SpSchema[tableId].Name),
}
l = append(l, toAppend)
case internal.TypeMismatch:
toAppend := Issue{
Category: IssueDB[i].Category,
Description: fmt.Sprintf("Table '%s': Type mismatch in '%s'column affecting check constraints. Verify data type compatibility with constraint logic", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[colId].Name),
}
l = append(l, toAppend)

default:
toAppend := Issue{
Category: IssueDB[i].Category,
Expand Down
38 changes: 23 additions & 15 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,27 @@ import (

// Table represents a database table.
type Table struct {
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
Indexes []Index
Id string
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
CheckConstraints []CheckConstraint
Indexes []Index
Id string
}

// Column represents a database column.
// TODO: add support for foreign keys.
type Column struct {
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
}

// ForeignKey represents a foreign key.
Expand All @@ -76,6 +77,13 @@ type ForeignKey struct {
Id string
}

// CheckConstraints represents a check constraint defined in the schema.
type CheckConstraint struct {
Name string
Expr string
Id string
}

// Key respresents a primary key or index key.
type Key struct {
ColId string
Expand Down
23 changes: 12 additions & 11 deletions sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type InfoSchema interface {
GetColumns(conv *internal.Conv, table SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)
GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
GetRowCount(table SchemaAndName) (int64, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, map[string][]string, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error)
GetForeignKeys(conv *internal.Conv, table SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
Expand Down Expand Up @@ -186,7 +186,7 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
var t schema.Table
fmt.Println("processing schema for table", table)
tblId := internal.GenerateTableId()
primaryKeys, constraints, err := infoSchema.GetConstraints(conv, table)
primaryKeys, checkConstraints, constraints, err := infoSchema.GetConstraints(conv, table)
if err != nil {
return t, fmt.Errorf("couldn't get constraints for table %s.%s: %s", table.Schema, table.Name, err)
}
Expand Down Expand Up @@ -216,15 +216,16 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
schemaPKeys = append(schemaPKeys, schema.Key{ColId: colNameIdMap[k]})
}
t = schema.Table{
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
Indexes: indexes,
ForeignKeys: foreignKeys}
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
CheckConstraints: checkConstraints,
Indexes: indexes,
ForeignKeys: foreignKeys}
return t, nil
}

Expand Down
38 changes: 27 additions & 11 deletions sources/common/toddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
if srcCol.Ignored.Default {
issues = append(issues, internal.DefaultValue)
}
if srcCol.Ignored.AutoIncrement { //TODO(adibh) - check why this is not there in postgres
if srcCol.Ignored.AutoIncrement { // TODO(adibh) - check why this is not there in postgres
issues = append(issues, internal.AutoIncrement)
}
// Set the not null constraint to false for unsupported source datatypes
Expand Down Expand Up @@ -167,14 +167,16 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
}
comment := "Spanner schema for source table " + quoteIfNeeded(srcTable.Name)
conv.SpSchema[srcTable.Id] = ddl.CreateTable{
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
CheckConstraints: cvtCheckConstraint(conv, srcTable.CheckConstraints),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id,
}
return nil
}

Expand Down Expand Up @@ -234,6 +236,20 @@ func cvtForeignKeys(conv *internal.Conv, spTableName string, srcTableId string,
return spKeys
}

// cvtCheckConstraint converts check constraints from source to Spanner.
func cvtCheckConstraint(conv *internal.Conv, srcKeys []schema.CheckConstraint) []ddl.CheckConstraint {
var spcc []ddl.CheckConstraint

for _, cc := range srcKeys {
spcc = append(spcc, ddl.CheckConstraint{
Id: cc.Id,
Name: internal.ToSpannerCheckConstraintName(conv, cc.Name),
Expr: cc.Expr,
})
}
return spcc
}

func CvtForeignKeysHelper(conv *internal.Conv, spTableName string, srcTableId string, srcKey schema.ForeignKey, isRestore bool) (ddl.Foreignkey, error) {
if len(srcKey.ColIds) != len(srcKey.ReferColumnIds) {
conv.Unexpected(fmt.Sprintf("ConvertForeignKeys: ColIds and referColumns don't have the same lengths: len(columns)=%d, len(referColumns)=%d for source tableId: %s, referenced table: %s", len(srcKey.ColIds), len(srcKey.ReferColumnIds), srcTableId, srcKey.ReferTableId))
Expand Down Expand Up @@ -330,8 +346,8 @@ func CvtIndexHelper(conv *internal.Conv, tableId string, srcIndex schema.Index,
isPresent = true
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
if spColDef[v].T.Name == ddl.Numeric {
//index on NUMERIC is not supported in PGSQL Dialect currently.
//Indexes which contains a NUMERIC column in it will need to be skipped.
// index on NUMERIC is not supported in PGSQL Dialect currently.
// Indexes which contains a NUMERIC column in it will need to be skipped.
return ddl.CreateIndex{}
}
}
Expand Down
41 changes: 41 additions & 0 deletions sources/common/toddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,44 @@ func Test_SchemaToSpannerSequenceHelper(t *testing.T) {
assert.Equal(t, expectedConv, conv)
}
}

func Test_cvtCheckContraint(t *testing.T) {

conv := internal.MakeConv()
srcSchema := []schema.CheckConstraint{
{
Id: "cc1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "cc2",
Name: "check_2",
Expr: "age < 99",
},
{
Id: "cc3",
Name: "@invalid_name", // incompatabile name
Expr: "age != 0",
},
}
spSchema := []ddl.CheckConstraint{
{
Id: "cc1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "cc2",
Name: "check_2",
Expr: "age < 99",
},
{
Id: "cc3",
Name: "Ainvalid_name",
Expr: "age != 0",
},
}
result := cvtCheckConstraint(conv, srcSchema)
assert.Equal(t, spSchema, result)
}
6 changes: 3 additions & 3 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)
return *result.Table.ItemCount, err
}

func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error) {
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, checkConstraints []schema.CheckConstraint, constraints map[string][]string, err error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(table.Name),
}
result, err := isi.DynamoClient.DescribeTable(input)
if err != nil {
return primaryKeys, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
return primaryKeys, checkConstraints, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
}

// Primary keys.
for _, i := range result.Table.KeySchema {
primaryKeys = append(primaryKeys, *i.AttributeName)
}
return primaryKeys, constraints, nil
return primaryKeys, checkConstraints, constraints, nil
}

func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) {
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func TestInfoSchemaImpl_GetConstraints(t *testing.T) {
dySchema := common.SchemaAndName{Name: "test"}
conv := internal.MakeConv()
isi := InfoSchemaImpl{client, nil, 10}
primaryKeys, constraints, err := isi.GetConstraints(conv, dySchema)
primaryKeys, _, constraints, err := isi.GetConstraints(conv, dySchema)
assert.Nil(t, err)

pKeys := []string{"a", "b"}
Expand Down Expand Up @@ -705,7 +705,7 @@ func TestInfoSchemaImpl_GetColumns(t *testing.T) {
client := &mockDynamoClient{
scanOutputs: scanOutputs,
}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}

isi := InfoSchemaImpl{client, nil, 10}

Expand Down
Loading

0 comments on commit 38ee14e

Please sign in to comment.