Skip to content

Commit

Permalink
Check constraint backend (#9)
Browse files Browse the repository at this point in the history
Backend Support for Check Constraint
  • Loading branch information
taherkl authored and akashthawaitcc committed Dec 26, 2024
1 parent 82652c8 commit 5aceba3
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 55 deletions.
1 change: 1 addition & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ const (
DefaultValueError
TypeMismatch
DefaultValueError
TypeMismatch
)

const (
Expand Down
1 change: 0 additions & 1 deletion internal/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func ToSpannerCheckConstraintName(conv *Conv, srcCheckConstraintName string) str
return getSpannerValidName(conv, srcCheckConstraintName)
}


// conv.UsedNames tracks Spanner names that have been used for table names, foreign key constraints
// and indexes. We use this to ensure we generate unique names when
// we map from source dbs to Spanner since Spanner requires all these names to be
Expand Down
7 changes: 7 additions & 0 deletions internal/reports/report_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ func buildTableReportBody(conv *internal.Conv, tableId string, issues map[string
Description: fmt.Sprintf("%s for table '%s' column '%s'", IssueDB[i].Brief, conv.SpSchema[tableId].Name, spColName),
}
l = append(l, toAppend)
case internal.TypeMismatch:
toAppend := Issue{
Category: IssueDB[i].Category,
Description: fmt.Sprintf("Table '%s': Type mismatch in '%s'column affecting check constraints. Verify data type compatibility with constraint logic", conv.SpSchema[tableId].Name, conv.SpSchema[tableId].ColDefs[colId].Name),
}
l = append(l, toAppend)

default:
toAppend := Issue{
Category: IssueDB[i].Category,
Expand Down
31 changes: 31 additions & 0 deletions sources/common/toddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,3 +685,34 @@ func TestSpannerSchemaApplyExpressions(t *testing.T) {
})
}
}

func Test_cvtCheckContraint(t *testing.T) {

conv := internal.MakeConv()
srcSchema := []schema.CheckConstraint{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
spSchema := []ddl.CheckConstraint{
{
Id: "ck1",
Name: "check_1",
Expr: "age > 0",
},
{
Id: "ck1",
Name: "check_2",
Expr: "age < 99",
},
}
result := cvtCheckConstraint(conv, srcSchema)
assert.Equal(t, spSchema, result)
}
2 changes: 1 addition & 1 deletion sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAnd
if colDefault.Valid {
defaultVal.Value = ddl.Expression{
ExpressionId: internal.GenerateExpressionId(),
Statement: common.SanitizeDefaultValue(colDefault.String, dataType, colExtra.String == constants.DEFAULT_GENERATED),
Statement: common.SanitizeDefaultValue(colDefault.String, dataType, colExtra.String == constants.DEFAULT_GENERATED),
}
}

Expand Down
116 changes: 116 additions & 0 deletions sources/mysql/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mysql
import (
"database/sql"
"database/sql/driver"
"fmt"
"regexp"
"testing"

Expand Down Expand Up @@ -680,3 +681,118 @@ func TestGetConstraints_CheckConstraintsTableAbsent(t *testing.T) {
_, _, _, err := isi.GetConstraints(conv, common.SchemaAndName{Schema: "your_schema", Name: "your_table"})
assert.Error(t, err)
}

func TestGetConstraints(t *testing.T) {

case1 := []mockSpec{
{
query: `SELECT COUNT\(\*\)
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA'
AND TABLE_NAME = 'CHECK_CONSTRAINTS';
`,
cols: []string{"COUNT"},
rows: [][]driver.Value{{1}},
},
{
query: `(?i)SELECT\s+COALESCE\(k.COLUMN_NAME,\s*''\)\s+AS\s+COLUMN_NAME,\s+t\.CONSTRAINT_NAME,\s+t\.CONSTRAINT_TYPE,\s+COALESCE\(c.CHECK_CLAUSE,\s*''\)\s+AS\s+CHECK_CLAUSE\s+FROM\s+INFORMATION_SCHEMA\.TABLE_CONSTRAINTS\s+AS\s+t\s+LEFT\s+JOIN\s+INFORMATION_SCHEMA\.KEY_COLUMN_USAGE\s+AS\s+k\s+ON\s+t\.CONSTRAINT_NAME\s*=\s*k\.CONSTRAINT_NAME\s+AND\s+t\.CONSTRAINT_SCHEMA\s*=\s*k\.CONSTRAINT_SCHEMA\s+AND\s+t\.TABLE_NAME\s*=\s*k\.TABLE_NAME\s+LEFT\s+JOIN\s+INFORMATION_SCHEMA\.CHECK_CONSTRAINTS\s+AS\s+c\s+ON\s+t\.CONSTRAINT_NAME\s*=\s*c\.CONSTRAINT_NAME\s+WHERE\s+t\.TABLE_SCHEMA\s*=\s*\?\s+AND\s+t\.TABLE_NAME\s*=\s*\?\s*ORDER\s+BY\s+k\.ORDINAL_POSITION`,
args: []driver.Value{"test_schema", "test_table"},
cols: []string{"COLUMN_NAME", "CONSTRAINT_NAME", "CONSTRAINT_TYPE", "CHECK_CLAUSE"},
rows: [][]driver.Value{{"id", "PRIMARY", "PRIMARY KEY", ""}, {"", "chk_test", "CHECK", "amount > 0"}},
},
}

case2 := []mockSpec{
{
query: `SELECT COUNT\(\*\)
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA'
AND TABLE_NAME = 'CHECK_CONSTRAINTS';
`,
cols: []string{"COUNT"},
rows: [][]driver.Value{{0}},
},
{
query: `(?i)SELECT\s+k\.COLUMN_NAME,\s+t\.CONSTRAINT_TYPE\s+FROM\s+INFORMATION_SCHEMA\.TABLE_CONSTRAINTS\s+AS\s+t\s+INNER\s+JOIN\s+INFORMATION_SCHEMA\.KEY_COLUMN_USAGE\s+AS\s+k\s+ON\s+t\.CONSTRAINT_NAME\s*=\s*k\.CONSTRAINT_NAME\s+AND\s+t\.CONSTRAINT_SCHEMA\s*=\s*k\.CONSTRAINT_SCHEMA\s+AND\s+t\.TABLE_NAME\s*=\s*k\.TABLE_NAME\s+WHERE\s+k\.TABLE_SCHEMA\s*=\s*\?\s+AND\s+k\.TABLE_NAME\s*=\s*\?\s*ORDER\s+BY\s+k\.ORDINAL_POSITION;`,
args: []driver.Value{"test_schema", "test_table"},
cols: []string{"COLUMN_NAME", "CONSTRAINT_NAME", "CONSTRAINT_TYPE", "CHECK_CLAUSE"},
rows: [][]driver.Value{{"id", "PRIMARY", "PRIMARY KEY", ""}},
},
}

cases := []struct {
db []mockSpec
tableExists bool
}{
{
db: case1,
tableExists: true,
},
{
db: case2,
tableExists: false,
},
}

for _, tc := range cases {
if tc.tableExists {
db := mkMockDB(t, tc.db)

defer db.Close()

isi := InfoSchemaImpl{Db: db}

table := common.SchemaAndName{
Schema: "test_schema",
Name: "test_table",
}

conv := new(internal.Conv)

primaryKeys, checkKeys, constraints, err := isi.GetConstraints(conv, table)
if err != nil {
t.Fatalf("expected no error, but got %v", err)
}

expectedPrimaryKeys := []string{"id"}
if fmt.Sprintf("%v", primaryKeys) != fmt.Sprintf("%v", expectedPrimaryKeys) {
t.Errorf("expected %v, got %v for primary keys", expectedPrimaryKeys, primaryKeys)
}

expectedCheckKeys := []schema.CheckConstraint{
{Name: "chk_test", Expr: "amount > 0", Id: "ck1"},
}

assert.Equal(t, expectedCheckKeys, checkKeys)
assert.Equal(t, expectedPrimaryKeys, primaryKeys)
assert.Empty(t, constraints)
} else {
db := mkMockDB(t, tc.db)

defer db.Close()

isi := InfoSchemaImpl{Db: db}

table := common.SchemaAndName{
Schema: "test_schema",
Name: "test_table",
}

conv := new(internal.Conv)

primaryKeys, checkKeys, constraints, err := isi.GetConstraints(conv, table)
if err != nil {
t.Fatalf("expected no error, but got %v", err)
}

expectedPrimaryKeys := []string{"id"}
if fmt.Sprintf("%v", primaryKeys) != fmt.Sprintf("%v", expectedPrimaryKeys) {
t.Errorf("expected %v, got %v for primary keys", expectedPrimaryKeys, primaryKeys)
}

assert.Equal(t, expectedPrimaryKeys, primaryKeys)
assert.Empty(t, checkKeys)
assert.Empty(t, constraints)
}
}
}
85 changes: 85 additions & 0 deletions webv2/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,91 @@ func UpdateCheckConstraint(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(convm)
}

// UpdateCheckConstraint processes the request to update spanner table check constraints, ensuring session and schema validity, and responds with the updated conversion metadata.
func UpdateCheckConstraint(w http.ResponseWriter, r *http.Request) {
tableId := r.FormValue("table")
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
}
sessionState := session.GetSessionState()
if sessionState.Conv == nil || sessionState.Driver == "" {
http.Error(w, fmt.Sprintf("Schema is not converted or Driver is not configured properly. Please retry converting the database to Spanner."), http.StatusNotFound)
return
}
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()

newCKs := []ddl.CheckConstraint{}
if err = json.Unmarshal(reqBody, &newCKs); err != nil {
http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest)
return
}

sp := sessionState.Conv.SpSchema[tableId]
sp.CheckConstraints = newCKs
sessionState.Conv.SpSchema[tableId] = sp
session.UpdateSessionFile()

convm := session.ConvWithMetadata{
SessionMetadata: sessionState.SessionMetadata,
Conv: *sessionState.Conv,
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(convm)
}

func doesNameExist(spcks []ddl.CheckConstraint, targetName string) bool {
for _, spck := range spcks {
if strings.Contains(spck.Expr, targetName) {
return true
}
}
return false
}

// ValidateCheckConstraint verifies if the type of a database column has been altered and add an error if a change is detected.
func ValidateCheckConstraint(w http.ResponseWriter, r *http.Request) {
sessionState := session.GetSessionState()
if sessionState.Conv == nil || sessionState.Driver == "" {
http.Error(w, fmt.Sprintf("Schema is not converted or Driver is not configured properly. Please retry converting the database to Spanner."), http.StatusNotFound)
return
}
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()

sp := sessionState.Conv.SpSchema
srcschema := sessionState.Conv.SrcSchema
flag := true
var schemaIssue []internal.SchemaIssue

for _, src := range srcschema {
for _, col := range sp[src.Id].ColDefs {
if len(sp[src.Id].CheckConstraints) > 0 {
spType := col.T.Name
srcType := srcschema[src.Id].ColDefs[col.Id].Type
actualType := mysqlDefaultTypeMap[srcType.Name]
if actualType.Name != spType {
columnName := sp[src.Id].ColDefs[col.Id].Name
spcks := sp[src.Id].CheckConstraints
if doesNameExist(spcks, columnName) {
flag = false
schemaIssue = sessionState.Conv.SchemaIssues[src.Id].ColumnLevelIssues[col.Id]
if !utilities.IsSchemaIssuePresent(schemaIssue, internal.TypeMismatch) {
schemaIssue = append(schemaIssue, internal.TypeMismatch)
}
sessionState.Conv.SchemaIssues[src.Id].ColumnLevelIssues[col.Id] = schemaIssue
break
}
}
}
}
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(flag)
}

// renameForeignKeys checks the new names for spanner name validity, ensures the new names are already not used by existing tables
// secondary indexes or foreign key constraints. If above checks passed then foreignKey renaming reflected in the schema else appropriate
// error thrown.
Expand Down
Loading

0 comments on commit 5aceba3

Please sign in to comment.