Skip to content

Commit

Permalink
Support check constraint backend (GoogleCloudPlatform#962)
Browse files Browse the repository at this point in the history
* Check constraint backend (#9)

Backend Support for Check Constraint

* update api

* fix PR comment

* remove api call to while validating constraints

* Fixed db collation regex to remove collation name from the results

* renamed function name to formatCheckConstraints and added check if constraint name is empty

* fixed PR comments

* added test case for the empty check constraint name

* fix: added regular exprression to match the exact column

* fix: added regular expression to replace table name

* Added test case for the column rename for check constraint

* 1. Refactored GetConstraint function
2. Fixed inforschema unit tests

* added comment at handling case for check constraints

* reverted white spaces

* reverted white spaces

* nit: doesCheckConstraintNameExist

* added comments for doesCheckConstraintNameExist

* PR and UT fixes

* fix UT

* UT fix

* Removed isCheckConstraintsTablePresent function

* moved regex globally

* Fix UT

* fixed UT

* fixed handling of the constraints

* removed unused function

* added unit tests for incompatable name

* Combined unit tests

* added test case for the renaming column having substring of other column

* added the query changes which return distinct value

---------

Co-authored-by: taherkl <taher.lakdawala@ollion.com>
Co-authored-by: Akash Thawait <aakash@ollion.com>
Co-authored-by: Vivek Yadav <vivek.yadav@ollion.com>
  • Loading branch information
4 people committed Dec 19, 2024
1 parent e6d5857 commit 604d12f
Show file tree
Hide file tree
Showing 22 changed files with 880 additions and 330 deletions.
22 changes: 12 additions & 10 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ const (
AddIndex = "add_index"
EditColumnMaxLength = "edit_column_max_length"
AddShardIdPrimaryKey = "add_shard_id_primary_key"
//bulk migration type
// bulk migration type
BULK_MIGRATION = "bulk"
//dataflow migration type
// dataflow migration type
DATAFLOW_MIGRATION = "dataflow"
//DMS migration type
// DMS migration type
DMS_MIGRATION = "dms"

SESSION_FILE = "sessionFile"

//Default shardId
// Default shardId
DEFAULT_SHARD_ID string = "smt-default"
//Metadata database name
// Metadata database name
METADATA_DB string = "spannermigrationtool_metadata"
//Migration types
// Migration types
MINIMAL_DOWNTIME_MIGRATION = "minimal_downtime"
//Job Resource Types
// Job Resource Types
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
DLQ_PUBSUB_RESOURCE string = "dlq_pubsub"
Expand All @@ -111,7 +111,7 @@ const (
// Default gcs path of the Dataflow template.
DEFAULT_TEMPLATE_PATH string = "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner"

//FK Actions
// FK Actions
FK_NO_ACTION string = "NO ACTION"
FK_CASCADE string = "CASCADE"
FK_SET_DEFAULT string = "SET DEFAULT"
Expand All @@ -122,8 +122,10 @@ const (
REGULAR_GCS string = "data"
DLQ_GCS string = "dlq"

//VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
// VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
DEFAUT_EXPRESSION = "DEFAULT"

// Regex for matching database collation
DB_COLLATION_REGEX = `(_[a-zA-Z0-9]+\\|\\)`
)
2 changes: 1 addition & 1 deletion internal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type Counter struct {
counterMutex sync.Mutex
ObjectId string
ObjectId string
}

var Cntr Counter
Expand Down
31 changes: 16 additions & 15 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,27 @@ import (

// Table represents a database table.
type Table struct {
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
Indexes []Index
Id string
Name string
Schema string
ColIds []string // List of column Ids (for predictable iteration order e.g. printing).
ColDefs map[string]Column // Details of columns.
ColNameIdMap map[string]string `json:"-"` // Computed every time just after conv is generated or after any column renaming
PrimaryKeys []Key
ForeignKeys []ForeignKey
CheckConstraints []CheckConstraint
Indexes []Index
Id string
}

// Column represents a database column.
// TODO: add support for foreign keys.
type Column struct {
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
Name string
Type Type
NotNull bool
Ignored Ignored
Id string
AutoGen ddl.AutoGenCol
}

// ForeignKey represents a foreign key.
Expand Down
23 changes: 12 additions & 11 deletions sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type InfoSchema interface {
GetColumns(conv *internal.Conv, table SchemaAndName, constraints map[string][]string, primaryKeys []string) (map[string]schema.Column, []string, error)
GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
GetRowCount(table SchemaAndName) (int64, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, map[string][]string, error)
GetConstraints(conv *internal.Conv, table SchemaAndName) ([]string, []schema.CheckConstraint, map[string][]string, error)
GetForeignKeys(conv *internal.Conv, table SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
Expand Down Expand Up @@ -186,7 +186,7 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
var t schema.Table
fmt.Println("processing schema for table", table)
tblId := internal.GenerateTableId()
primaryKeys, constraints, err := infoSchema.GetConstraints(conv, table)
primaryKeys, checkConstraints, constraints, err := infoSchema.GetConstraints(conv, table)
if err != nil {
return t, fmt.Errorf("couldn't get constraints for table %s.%s: %s", table.Schema, table.Name, err)
}
Expand Down Expand Up @@ -216,15 +216,16 @@ func (is *InfoSchemaImpl) processTable(conv *internal.Conv, table SchemaAndName,
schemaPKeys = append(schemaPKeys, schema.Key{ColId: colNameIdMap[k]})
}
t = schema.Table{
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
Indexes: indexes,
ForeignKeys: foreignKeys}
Id: tblId,
Name: name,
Schema: table.Schema,
ColIds: colIds,
ColNameIdMap: colNameIdMap,
ColDefs: colDefs,
PrimaryKeys: schemaPKeys,
CheckConstraints: checkConstraints,
Indexes: indexes,
ForeignKeys: foreignKeys}
return t, nil
}

Expand Down
24 changes: 13 additions & 11 deletions sources/common/toddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
if srcCol.Ignored.Default {
issues = append(issues, internal.DefaultValue)
}
if srcCol.Ignored.AutoIncrement { //TODO(adibh) - check why this is not there in postgres
if srcCol.Ignored.AutoIncrement { // TODO(adibh) - check why this is not there in postgres
issues = append(issues, internal.AutoIncrement)
}
// Set the not null constraint to false for unsupported source datatypes
Expand Down Expand Up @@ -167,14 +167,16 @@ func (ss *SchemaToSpannerImpl) SchemaToSpannerDDLHelper(conv *internal.Conv, tod
}
comment := "Spanner schema for source table " + quoteIfNeeded(srcTable.Name)
conv.SpSchema[srcTable.Id] = ddl.CreateTable{
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id}
Name: spTableName,
ColIds: spColIds,
ColDefs: spColDef,
PrimaryKeys: cvtPrimaryKeys(srcTable.PrimaryKeys),
ForeignKeys: cvtForeignKeys(conv, spTableName, srcTable.Id, srcTable.ForeignKeys, isRestore),
CheckConstraints: cvtCheckConstraint(conv, srcTable.CheckConstraints),
Indexes: cvtIndexes(conv, srcTable.Id, srcTable.Indexes, spColIds, spColDef),
Comment: comment,
Id: srcTable.Id,
}
return nil
}

Expand Down Expand Up @@ -344,8 +346,8 @@ func CvtIndexHelper(conv *internal.Conv, tableId string, srcIndex schema.Index,
isPresent = true
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
if spColDef[v].T.Name == ddl.Numeric {
//index on NUMERIC is not supported in PGSQL Dialect currently.
//Indexes which contains a NUMERIC column in it will need to be skipped.
// index on NUMERIC is not supported in PGSQL Dialect currently.
// Indexes which contains a NUMERIC column in it will need to be skipped.
return ddl.CreateIndex{}
}
}
Expand Down
6 changes: 3 additions & 3 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)
return *result.Table.ItemCount, err
}

func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error) {
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, checkConstraints []schema.CheckConstraint, constraints map[string][]string, err error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(table.Name),
}
result, err := isi.DynamoClient.DescribeTable(input)
if err != nil {
return primaryKeys, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
return primaryKeys, checkConstraints, constraints, fmt.Errorf("failed to make a DescribeTable API call for table %v: %v", table.Name, err)
}

// Primary keys.
for _, i := range result.Table.KeySchema {
primaryKeys = append(primaryKeys, *i.AttributeName)
}
return primaryKeys, constraints, nil
return primaryKeys, checkConstraints, constraints, nil
}

func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error) {
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func TestInfoSchemaImpl_GetConstraints(t *testing.T) {
dySchema := common.SchemaAndName{Name: "test"}
conv := internal.MakeConv()
isi := InfoSchemaImpl{client, nil, 10}
primaryKeys, constraints, err := isi.GetConstraints(conv, dySchema)
primaryKeys, _, constraints, err := isi.GetConstraints(conv, dySchema)
assert.Nil(t, err)

pKeys := []string{"a", "b"}
Expand Down Expand Up @@ -705,7 +705,7 @@ func TestInfoSchemaImpl_GetColumns(t *testing.T) {
client := &mockDynamoClient{
scanOutputs: scanOutputs,
}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}
dySchema := common.SchemaAndName{Name: "test", Id: "t1"}

isi := InfoSchemaImpl{client, nil, 10}

Expand Down
Loading

0 comments on commit 604d12f

Please sign in to comment.