From 1b7156070248bd12e00f7140aaafd0ff8ded1e74 Mon Sep 17 00:00:00 2001 From: Vivek Yadav Date: Fri, 20 Dec 2024 19:13:53 +0530 Subject: [PATCH] added dump flow --- conversion/conversion.go | 2 +- conversion/conversion_from_source.go | 11 ++++++++++- sources/mysql/mysqldump.go | 26 +++++++++++++------------- webv2/api/schema.go | 4 ++-- webv2/routes.go | 2 +- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/conversion/conversion.go b/conversion/conversion.go index 2ed254405..fa740c0c8 100644 --- a/conversion/conversion.go +++ b/conversion/conversion.go @@ -80,7 +80,7 @@ func (ci *ConvImpl) SchemaConv(migrationProjectId string, sourceProfile profiles case constants.POSTGRES, constants.MYSQL, constants.DYNAMODB, constants.SQLSERVER, constants.ORACLE: return schemaFromSource.schemaFromDatabase(migrationProjectId, sourceProfile, targetProfile, &GetInfoImpl{}, &common.ProcessSchemaImpl{}) case constants.PGDUMP, constants.MYSQLDUMP: - expressionVerificationAccessor,_ := expressions_api.NewExpressionVerificationAccessorImpl(context.Background(), targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance) + expressionVerificationAccessor, _ := expressions_api.NewExpressionVerificationAccessorImpl(context.Background(), targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance) return schemaFromSource.SchemaFromDump(sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{expressionVerificationAccessor}) default: return nil, fmt.Errorf("schema conversion for driver %s not supported", sourceProfile.Driver) diff --git a/conversion/conversion_from_source.go b/conversion/conversion_from_source.go index 0290af537..321b4e4a7 100644 --- a/conversion/conversion_from_source.go +++ b/conversion/conversion_from_source.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" + "github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/logger" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" @@ -54,6 +55,9 @@ type DataFromSourceImpl struct{} func (sads *SchemaFromSourceImpl) schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error) { conv := internal.MakeConv() conv.SpDialect = targetProfile.Conn.Sp.Dialect + conv.SpProjectId = targetProfile.Conn.Sp.Project + conv.SpInstanceId = targetProfile.Conn.Sp.Instance + conv.Source = sourceProfile.Driver //handle fetching schema differently for sharded migrations, we only connect to the primary shard to //fetch the schema. We reuse the SourceProfileConnection object for this purpose. var infoSchema common.InfoSchema @@ -96,7 +100,11 @@ func (sads *SchemaFromSourceImpl) schemaFromDatabase(migrationProjectId string, additionalSchemaAttributes := internal.AdditionalSchemaAttributes{ IsSharded: isSharded, } - return conv, processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, additionalSchemaAttributes, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) + + ctx := context.Background() + expressionVerificationAccessor, _ := expressions_api.NewExpressionVerificationAccessorImpl(ctx, conv.SpProjectId, conv.SpInstanceId) + + return conv, processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, additionalSchemaAttributes, &common.SchemaToSpannerImpl{expressionVerificationAccessor}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{}) } func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) { @@ -109,6 +117,7 @@ func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string ioHelper.BytesRead = n conv := internal.MakeConv() conv.SpDialect = spDialect + conv.Source = driver p := internal.NewProgress(n, "Generating schema", internal.Verbose(), false, int(internal.SchemaCreationInProgress)) r := internal.NewReader(bufio.NewReader(f), p) conv.SetSchemaMode() // Build schema and ignore data in dump. diff --git a/sources/mysql/mysqldump.go b/sources/mysql/mysqldump.go index 65c477975..be78712db 100644 --- a/sources/mysql/mysqldump.go +++ b/sources/mysql/mysqldump.go @@ -53,7 +53,7 @@ var spatialIndexRegex = regexp.MustCompile("(?i)\\sSPATIAL\\s") var spatialSridRegex = regexp.MustCompile("(?i)\\sSRID\\s\\d*") // DbDumpImpl MySQL specific implementation for DdlDumpImpl. -type DbDumpImpl struct{ +type DbDumpImpl struct { ExpressionVerificationAccessor expressions_api.ExpressionVerificationAccessor } @@ -296,14 +296,14 @@ func processCreateTable(conv *internal.Conv, stmt *ast.CreateTableStmt) { } conv.SchemaStatement(NodeType(stmt)) conv.SrcSchema[tableId] = schema.Table{ - Id: tableId, - Name: tableName, - ColIds: colIds, - ColNameIdMap: colNameIdMap, - ColDefs: colDef, - PrimaryKeys: keys, - ForeignKeys: fkeys, - Indexes: index, + Id: tableId, + Name: tableName, + ColIds: colIds, + ColNameIdMap: colNameIdMap, + ColDefs: colDef, + PrimaryKeys: keys, + ForeignKeys: fkeys, + Indexes: index, CheckConstraints: checkConstraints, } for _, constraint := range stmt.Constraints { @@ -346,10 +346,10 @@ func getCheckConstraints(constraints []*ast.Constraint) (checkConstraints []sche exp := expressionToString(constraint.Expr) exp = dbcollationRegex.ReplaceAllString(exp, "$1") checkConstraint := schema.CheckConstraint{ - Name: constraint.Name, - Expr: exp, - ExprId: internal.GenerateCheckConstrainstExprId (), - Id: internal.GenerateCheckConstrainstId(), + Name: constraint.Name, + Expr: exp, + ExprId: internal.GenerateCheckConstrainstExprId(), + Id: internal.GenerateCheckConstrainstId(), } checkConstraints = append(checkConstraints, checkConstraint) } diff --git a/webv2/api/schema.go b/webv2/api/schema.go index ac188050e..758273317 100644 --- a/webv2/api/schema.go +++ b/webv2/api/schema.go @@ -147,7 +147,7 @@ func (expressionVerificationHandler *ExpressionsVerificationHandler) ConvertSche // ConvertSchemaDump converts schema from dump file to Spanner schema for // mysqldump and pg_dump driver. -func ConvertSchemaDump(w http.ResponseWriter, r *http.Request) { +func (expressionVerificationHandler *ExpressionsVerificationHandler) ConvertSchemaDump(w http.ResponseWriter, r *http.Request) { reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) @@ -169,7 +169,7 @@ func ConvertSchemaDump(w http.ResponseWriter, r *http.Request) { sourceProfile, _ := profiles.NewSourceProfile("", dc.Config.Driver, &n) sourceProfile.Driver = dc.Config.Driver schemaFromSource := conversion.SchemaFromSourceImpl{} - conv, err := schemaFromSource.SchemaFromDump(sourceProfile.Driver, dc.SpannerDetails.Dialect, &utils.IOStreams{In: f, Out: os.Stdout}, &conversion.ProcessDumpByDialectImpl{}) + conv, err := schemaFromSource.SchemaFromDump(sourceProfile.Driver, dc.SpannerDetails.Dialect, &utils.IOStreams{In: f, Out: os.Stdout}, &conversion.ProcessDumpByDialectImpl{ExpressionVerificationAccessor: expressionVerificationHandler.ExpressionVerificationAccessor}) if err != nil { http.Error(w, fmt.Sprintf("Schema Conversion Error : %v", err), http.StatusNotFound) return diff --git a/webv2/routes.go b/webv2/routes.go index dc0d07b0e..9d506de8d 100644 --- a/webv2/routes.go +++ b/webv2/routes.go @@ -62,7 +62,7 @@ func getRoutes() *mux.Router { router.HandleFunc("/connect", databaseConnection).Methods("POST") router.HandleFunc("/convert/infoschema", expressionVerificationHandler.ConvertSchemaSQL).Methods("GET") - router.HandleFunc("/convert/dump", api.ConvertSchemaDump).Methods("POST") + router.HandleFunc("/convert/dump", expressionVerificationHandler.ConvertSchemaDump).Methods("POST") router.HandleFunc("/convert/session", loadSession).Methods("POST") router.HandleFunc("/ddl", api.GetDDL).Methods("GET") router.HandleFunc("/seqDdl", api.GetSequenceDDL).Methods("GET")