Skip to content

Commit

Permalink
Merge pull request #20 from GoogleCloudPlatform/master
Browse files Browse the repository at this point in the history
Sync master
  • Loading branch information
taherkl authored Dec 20, 2024
2 parents c37bae9 + 734593f commit 4606e74
Show file tree
Hide file tree
Showing 41 changed files with 984 additions and 84 deletions.
21 changes: 14 additions & 7 deletions .github/workflows/integration-tests-against-emulator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,28 @@ jobs:
- run: mysql -v -P 3306 --protocol=tcp -u root -proot < test_data/mysql_foreignkeyaction_dump.test.out

# init sql server with test_data
# since we use ubuntu-latest container, we should ensure that the path matches the latest from https://packages.microsoft.com/config/ubuntu/
# while its possible to infer the latest from the path in the run script, it will make the run section more complex and hard to maintian.
- name: Install sqlcmd required for loading .sql files
run: |
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update
sudo apt-get install mssql-tools unixodbc-dev
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bash_profile
- run: sqlcmd -?
- run: sqlcmd -U sa -P ${MSSQL_SA_PASSWORD} -i test_data/sqlserver.test.out
curl https://packages.microsoft.com/config/ubuntu/24.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
sudo apt-get install mssql-tools18 unixodbc-dev
set -x
ls /opt/mssql-tools18/bin/
set +x
echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc
- run: /opt/mssql-tools18/bin/sqlcmd -C -?
- run: /opt/mssql-tools18/bin/sqlcmd -U sa -P ${MSSQL_SA_PASSWORD} -i test_data/sqlserver.test.out -C

# sqlplus set up init oracle db.
- name: Install sqlplus required for loading .sql files
run: |
sudo apt-get update
sudo apt-get install -y libaio1 rpm2cpio cpio
sudo apt-get install -y libaio1t64 rpm2cpio cpio
sudo ln -s /usr/lib/x86_64-linux-gnu/libaio.so.1t64 /usr/lib/libaio.so.1
curl -O https://download.oracle.com/otn_software/linux/instantclient/2340000/oracle-instantclient-basic-23.4.0.24.05-1.el9.x86_64.rpm
curl -O https://download.oracle.com/otn_software/linux/instantclient/2340000/oracle-instantclient-sqlplus-23.4.0.24.05-1.el9.x86_64.rpm
rpm2cpio oracle-instantclient-basic-23.4.0.24.05-1.el9.x86_64.rpm | sudo cpio -idmv
Expand Down
5 changes: 3 additions & 2 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (
DLQ_GCS string = "dlq"

//VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
CHECK_EXPRESSION = "CHECK"
DEFAUT_EXPRESSION = "DEFAULT"

DEFAULT_GENERATED = "DEFAULT_GENERATED"
TEMP_DB = "smt-staging-db"
)
6 changes: 6 additions & 0 deletions conversion/conversion_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type DataFromSourceImpl struct{}
func (sads *SchemaFromSourceImpl) schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) {
conv := internal.MakeConv()
conv.SpDialect = targetProfile.Conn.Sp.Dialect
conv.SpProjectId = targetProfile.Conn.Sp.Project
conv.SpInstanceId = targetProfile.Conn.Sp.Instance
conv.Source = sourceProfile.Driver
//handle fetching schema differently for sharded migrations, we only connect to the primary shard to
//fetch the schema. We reuse the SourceProfileConnection object for this purpose.
var infoSchema common.InfoSchema
Expand Down Expand Up @@ -159,6 +162,9 @@ func (sads *DataFromSourceImpl) dataFromCSV(ctx context.Context, sourceProfile p
return nil, fmt.Errorf("dbName is mandatory in target-profile for csv source")
}
conv.SpDialect = targetProfile.Conn.Sp.Dialect
conv.SpProjectId = targetProfile.Conn.Sp.Project
conv.SpInstanceId = targetProfile.Conn.Sp.Instance
conv.Source = sourceProfile.Driver
dialect, err := targetProfile.FetchTargetDialect(ctx)
if err != nil {
return nil, fmt.Errorf("could not fetch dialect: %v", err)
Expand Down
108 changes: 105 additions & 3 deletions expressions_api/expression_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

spannerclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/client"
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/task"
Expand All @@ -18,22 +19,50 @@ const THREAD_POOL = 500
type ExpressionVerificationAccessor interface {
//Batch API which parallelizes expression verification calls
VerifyExpressions(ctx context.Context, verifyExpressionsInput internal.VerifyExpressionsInput) internal.VerifyExpressionsOutput
RefreshSpannerClient(ctx context.Context, project string, instance string) error
}

type ExpressionVerificationAccessorImpl struct {
SpannerAccessor *spanneraccessor.SpannerAccessorImpl
}

func NewExpressionVerificationAccessorImpl(ctx context.Context, project string, instance string) (*ExpressionVerificationAccessorImpl, error) {
spannerAccessor, err := spanneraccessor.NewSpannerAccessorClientImplWithSpannerClient(ctx, fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, "smt-staging-db"))
if err != nil {
return nil, err
var spannerAccessor *spanneraccessor.SpannerAccessorImpl
var err error
if project != "" && instance != "" {
spannerAccessor, err = spanneraccessor.NewSpannerAccessorClientImplWithSpannerClient(ctx, fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, constants.TEMP_DB))
if err != nil {
return nil, err
}
} else {
spannerAccessor, err = spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return nil, err
}
}
return &ExpressionVerificationAccessorImpl{
SpannerAccessor: spannerAccessor,
}, nil
}

// APIs to verify and process Spanner DLL features such as Default Values, Check Constraints
type DDLVerifier interface {
VerifySpannerDDL(conv *internal.Conv, expressionDetails []internal.ExpressionDetail) (internal.VerifyExpressionsOutput, error)
GetSourceExpressionDetails(conv *internal.Conv, tableIds []string) []internal.ExpressionDetail
GetSpannerExpressionDetails(conv *internal.Conv, tableIds []string) []internal.ExpressionDetail
RefreshSpannerClient(ctx context.Context, project string, instance string) error
}
type DDLVerifierImpl struct {
Expressions ExpressionVerificationAccessor
}

func NewDDLVerifierImpl(ctx context.Context, project string, instance string) (*DDLVerifierImpl, error) {
expVerifier, err := NewExpressionVerificationAccessorImpl(ctx, project, instance)
return &DDLVerifierImpl{
Expressions: expVerifier,
}, err
}

func (ev *ExpressionVerificationAccessorImpl) VerifyExpressions(ctx context.Context, verifyExpressionsInput internal.VerifyExpressionsInput) internal.VerifyExpressionsOutput {
err := ev.validateRequest(verifyExpressionsInput)
if err != nil {
Expand Down Expand Up @@ -79,6 +108,15 @@ func (ev *ExpressionVerificationAccessorImpl) VerifyExpressions(ctx context.Cont
return verifyExpressionsOutput
}

func (ev *ExpressionVerificationAccessorImpl) RefreshSpannerClient(ctx context.Context, project string, instance string) error {
spannerClient, err := spannerclient.NewSpannerClientImpl(ctx, fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, constants.TEMP_DB))
if err != nil {
return err
}
ev.SpannerAccessor.SpannerClient = spannerClient
return nil
}

func (ev *ExpressionVerificationAccessorImpl) verifyExpressionInternal(expressionDetail internal.ExpressionDetail, mutex *sync.Mutex) task.TaskResult[internal.ExpressionVerificationOutput] {
var sqlStatement string
switch expressionDetail.Type {
Expand Down Expand Up @@ -129,3 +167,67 @@ func (ev *ExpressionVerificationAccessorImpl) removeExpressions(inputConv *inter
}
return convCopy, nil
}

func (ddlv *DDLVerifierImpl) VerifySpannerDDL(conv *internal.Conv, expressionDetails []internal.ExpressionDetail) (internal.VerifyExpressionsOutput, error) {
ctx := context.Background()
verifyExpressionsInput := internal.VerifyExpressionsInput{
Conv: conv,
Source: conv.Source,
ExpressionDetailList: expressionDetails,
}
verificationResults := ddlv.Expressions.VerifyExpressions(ctx, verifyExpressionsInput)

return verificationResults, verificationResults.Err
}

func (ddlv *DDLVerifierImpl) GetSourceExpressionDetails(conv *internal.Conv, tableIds []string) []internal.ExpressionDetail {
expressionDetails := []internal.ExpressionDetail{}
// Collect default values for verification
for _, tableId := range tableIds {
srcTable := conv.SrcSchema[tableId]
for _, srcColId := range srcTable.ColIds {
srcCol := srcTable.ColDefs[srcColId]
if srcCol.DefaultValue.IsPresent {
defaultValueExp := internal.ExpressionDetail{
ReferenceElement: internal.ReferenceElement{
Name: conv.SpSchema[tableId].ColDefs[srcColId].T.Name,
},
ExpressionId: srcCol.DefaultValue.Value.ExpressionId,
Expression: srcCol.DefaultValue.Value.Statement,
Type: "DEFAULT",
Metadata: map[string]string{"TableId": tableId, "ColId": srcColId},
}
expressionDetails = append(expressionDetails, defaultValueExp)
}
}
}
return expressionDetails
}

func (ddlv *DDLVerifierImpl) GetSpannerExpressionDetails(conv *internal.Conv, tableIds []string) []internal.ExpressionDetail {
expressionDetails := []internal.ExpressionDetail{}
// Collect default values for verification
for _, tableId := range tableIds {
spTable := conv.SpSchema[tableId]
for _, spColId := range spTable.ColIds {
spCol := spTable.ColDefs[spColId]
if spCol.DefaultValue.IsPresent {
defaultValueExp := internal.ExpressionDetail{
ReferenceElement: internal.ReferenceElement{
Name: conv.SpSchema[tableId].ColDefs[spColId].T.Name,
},
ExpressionId: spCol.DefaultValue.Value.ExpressionId,
Expression: spCol.DefaultValue.Value.Statement,
Type: "DEFAULT",
Metadata: map[string]string{"TableId": tableId, "ColId": spColId},
}
expressionDetails = append(expressionDetails, defaultValueExp)
}
}
}
return expressionDetails
}

func (ddlv *DDLVerifierImpl) RefreshSpannerClient(ctx context.Context, project string, instance string) error {
return ddlv.Expressions.RefreshSpannerClient(ctx, project, instance)
}
Loading

0 comments on commit 4606e74

Please sign in to comment.