Skip to content

Commit

Permalink
make snapshot read async
Browse files Browse the repository at this point in the history
If ISShardedSnapshot is true we now:

1) Create copy of all tables that will be transfered

2) Get all partition key ranges from these tables

3) Send these partitions asynchronously to target
commit_hash:ce2c966d9d2b06eb5a4338a5cc47b850333cde07
  • Loading branch information
KosovGrigorii committed Dec 25, 2024
1 parent 0be0bf1 commit b0f9613
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 48 deletions.
3 changes: 3 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,8 @@
"pkg/providers/ydb/storage.go":"transfer_manager/go/pkg/providers/ydb/storage.go",
"pkg/providers/ydb/storage_incremental.go":"transfer_manager/go/pkg/providers/ydb/storage_incremental.go",
"pkg/providers/ydb/storage_sampleable.go":"transfer_manager/go/pkg/providers/ydb/storage_sampleable.go",
"pkg/providers/ydb/storage_sharded.go":"transfer_manager/go/pkg/providers/ydb/storage_sharded.go",
"pkg/providers/ydb/storage_sharded_test.go":"transfer_manager/go/pkg/providers/ydb/storage_sharded_test.go",
"pkg/providers/ydb/storage_test.go":"transfer_manager/go/pkg/providers/ydb/storage_test.go",
"pkg/providers/ydb/tasks_cleanup_test.go":"transfer_manager/go/pkg/providers/ydb/tasks_cleanup_test.go",
"pkg/providers/ydb/typesystem.go":"transfer_manager/go/pkg/providers/ydb/typesystem.go",
Expand Down Expand Up @@ -2864,6 +2866,7 @@
"tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded/check_db_test.go",
"tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded_nulls/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded_nulls/check_db_test.go",
"tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded_olap/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/debezium/snapshot_serde_via_debezium_embedded_olap/check_db_test.go",
"tests/e2e/ydb2ydb/sharded_snapshot/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/sharded_snapshot/check_db_test.go",
"tests/e2e/ydb2ydb/snapshot/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/snapshot/check_db_test.go",
"tests/e2e/ydb2ydb/snapshot_and_replication/canondata/result.json":"transfer_manager/go/tests/e2e/ydb2ydb/snapshot_and_replication/canondata/result.json",
"tests/e2e/ydb2ydb/snapshot_and_replication/check_db_test.go":"transfer_manager/go/tests/e2e/ydb2ydb/snapshot_and_replication/check_db_test.go",
Expand Down
2 changes: 2 additions & 0 deletions pkg/providers/ydb/model_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,7 @@ func (d *YdbDestination) ToStorageParams() *YdbStorageParams {
OAuth2Config: d.OAuth2Config,
RootCAFiles: d.RootCAFiles,
TLSEnabled: false,
IsSnapshotSharded: false,
CopyFolder: "",
}
}
6 changes: 6 additions & 0 deletions pkg/providers/ydb/model_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type YdbSource struct {
TokenServiceURL string
SAKeyContent string
OAuth2Config *v3credential.OAuth2Config

// storage
IsSnapshotSharded bool
CopyFolder string
}

var _ model.Source = (*YdbSource)(nil)
Expand Down Expand Up @@ -197,6 +201,8 @@ func (s *YdbSource) ToStorageParams() *YdbStorageParams {
OAuth2Config: s.OAuth2Config,
RootCAFiles: s.RootCAFiles,
TLSEnabled: s.TLSEnabled,
IsSnapshotSharded: s.IsSnapshotSharded,
CopyFolder: s.CopyFolder,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/providers/ydb/model_storage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ type YdbStorageParams struct {

RootCAFiles []string
TLSEnabled bool

IsSnapshotSharded bool
CopyFolder string
}
2 changes: 1 addition & 1 deletion pkg/providers/ydb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Provider) Storage() (abstract.Storage, error) {
return nil, xerrors.Errorf("unexpected target type: %T", p.transfer.Dst)
}
p.fillIncludedTables(src)
return NewStorage(src.ToStorageParams())
return NewStorage(src.ToStorageParams(), p.registry)
}

func (p *Provider) fillIncludedTables(src *YdbSource) {
Expand Down
32 changes: 8 additions & 24 deletions pkg/providers/ydb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,36 +826,20 @@ func (s *sinker) insert(tablePath ydbPath, batch []abstract.ChangeItem) error {
return nil
}

err := s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) {
tableFullPath := s.getFullPath(tablePath)
err = session.BulkUpsert(ctx, tableFullPath, batchList)
if err != nil {
s.logger.Warn("unable to upload rows", log.Error(err), log.String("table", tableFullPath))
if s.config.IgnoreRowTooLargeErrors && rowTooLargeRegexp.MatchString(err.Error()) {
s.logger.Warn("ignoring row too large error as per IgnoreRowTooLargeErrors option")
return nil
}
return xerrors.Errorf("unable to bulk upsert table %v: %w", tableFullPath, err)
tableFullPath := s.getFullPath(tablePath)
bulkUpsertBatch := table.BulkUpsertDataRows(batchList)
if err := s.db.Table().BulkUpsert(ctx, tableFullPath, bulkUpsertBatch); err != nil {
s.logger.Warn("unable to upload rows", log.Error(err), log.String("table", tableFullPath))
if s.config.IgnoreRowTooLargeErrors && rowTooLargeRegexp.MatchString(err.Error()) {
s.logger.Warn("ignoring row too large error as per IgnoreRowTooLargeErrors option")
return nil
}
return nil
})

if err != nil {
return xerrors.Errorf("unable to bulk upsert:\n %w", err)
return xerrors.Errorf("unable to bulk upsert table %v: %w", tableFullPath, err)
}

return nil
}

// timeToTimestamp converts time to YDB-timestamp in microseconds
func timeToTimestamp(dt time.Time) uint64 {
mcs := dt.UTC().UnixNano() / 1000
if mcs < 0 {
return 0
}
return uint64(mcs)
}

func (s *sinker) fitTime(t time.Time) (time.Time, error) {
if t.Sub(time.Unix(0, 0)) < 0 {
if s.config.FitDatetime {
Expand Down
48 changes: 34 additions & 14 deletions pkg/providers/ydb/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/metrics"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/slices"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/stats"
"github.com/doublecloud/transfer/pkg/util"
"github.com/doublecloud/transfer/pkg/util/jsonx"
"github.com/doublecloud/transfer/pkg/xtls"
Expand All @@ -31,11 +33,12 @@ import (
)

type Storage struct {
config *YdbStorageParams
db *ydb.Driver
config *YdbStorageParams
db *ydb.Driver
metrics *stats.SourceStats
}

func NewStorage(cfg *YdbStorageParams) (*Storage, error) {
func NewStorage(cfg *YdbStorageParams, mtrcs metrics.Registry) (*Storage, error) {
var err error
var tlsConfig *tls.Config
if cfg.TLSEnabled {
Expand Down Expand Up @@ -69,8 +72,9 @@ func NewStorage(cfg *YdbStorageParams) (*Storage, error) {
}

return &Storage{
config: cfg,
db: ydbDriver,
config: cfg,
db: ydbDriver,
metrics: stats.NewSourceStats(mtrcs),
}, nil
}

Expand Down Expand Up @@ -129,12 +133,7 @@ func validateTableList(params *YdbStorageParams, paths []string) error {
return nil
}

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*3)
defer cancel()

// collect tables entries

func (s *Storage) listaAllTablesToTransfer(ctx context.Context) ([]string, error) {
allTables := []string{}
if len(s.config.Tables) == 0 {
result, err := s.traverse("/")
Expand Down Expand Up @@ -169,7 +168,21 @@ func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstr
allTables = slices.Map(allTables, func(from string) string {
return strings.TrimLeft(from, "/")
})
err := validateTableList(s.config, allTables)
return allTables, nil
}

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*3)
defer cancel()

// collect tables entries

allTables, err := s.listaAllTablesToTransfer(ctx)
if err != nil {
return nil, xerrors.Errorf("Failed to list tables that will be transfered: %w", err)
}

err = validateTableList(s.config, allTables)
if err != nil {
return nil, xerrors.Errorf("vaildation of TableList failed: %w", err)
}
Expand Down Expand Up @@ -202,18 +215,23 @@ func (s *Storage) TableSchema(ctx context.Context, tableID abstract.TableID) (*a
func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescription, pusher abstract.Pusher) error {
st := util.GetTimestampFromContextOrNow(ctx)

tablePath := path.Join(s.config.Database, tableDescr.Schema, tableDescr.Name)
tablePath := s.makeTablePath(tableDescr.Schema, tableDescr.Name)
partID := tableDescr.PartID()

var res result.StreamResult
var schema *abstract.TableSchema

err := s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) {
readTableOptions := []options.ReadTableOption{options.ReadOrdered()}
tableDescription, err := session.DescribeTable(ctx, tablePath)

tableDescription, err := session.DescribeTable(ctx, tablePath, options.WithShardKeyBounds())
if err != nil {
return xerrors.Errorf("unable to describe table: %w", err)
}
if s.config.IsSnapshotSharded {
keyRange := tableDescription.KeyRanges[tableDescr.Offset]
readTableOptions = append(readTableOptions, options.ReadKeyRange(keyRange))
}

tableColumns, err := filterYdbTableColumns(s.config.TableColumnsFilter, tableDescription)
if err != nil {
Expand Down Expand Up @@ -298,6 +316,8 @@ func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescri
Query: "",
Size: abstract.RawEventSize(util.DeepSizeof(vals)),
})
s.metrics.ChangeItems.Inc()
s.metrics.Size.Add(int64(changes[len(changes)-1].Size.Read))
if wrapAroundIdx == 10000 {
if err := pusher(changes); err != nil {
return xerrors.Errorf("unable to push: %w", err)
Expand Down
127 changes: 127 additions & 0 deletions pkg/providers/ydb/storage_sharded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ydb

import (
"context"
"path"
"strings"

"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
)

const defaultCopyFolder = "data-transfer"

func (s *Storage) modifyTableName(tablePath string) string {
return strings.ReplaceAll(tablePath, "/", "_")
}

func (s *Storage) BeginSnapshot(ctx context.Context) error {
if !s.config.IsSnapshotSharded {
return nil
}
if s.config.CopyFolder == "" {
s.config.CopyFolder = defaultCopyFolder
}

tables, err := s.listaAllTablesToTransfer(ctx)
if err != nil {
return xerrors.Errorf("Failed to list tables that will be transfered: %w", err)
}

if err := s.db.Scheme().MakeDirectory(ctx, s.makeTableDir()); err != nil {
return xerrors.Errorf("failed to create copy directory: %w", err)
}

copyItems := make([]options.CopyTablesOption, len(tables))
for i, tableName := range tables {
tablePath := path.Join(s.config.Database, tableName)
copyPath := s.makeTablePath("", s.modifyTableName(tableName))
copyItems[i] = options.CopyTablesItem(tablePath, copyPath, false)
}
return s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) {
err = session.CopyTables(ctx, copyItems...)
if err != nil {
return xerrors.Errorf("failed to copy tables to transfer directory: %w", err)
}
return nil
})
}

func (s *Storage) EndSnapshot(ctx context.Context) error {
if !s.config.IsSnapshotSharded {
return nil
}

copyDir := s.makeTableDir()
content, err := s.db.Scheme().ListDirectory(ctx, copyDir)
if err != nil {
return xerrors.Errorf("failed to list copy directory: %w", err)
}

err = s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) {
for _, copyTable := range content.Children {
copyPath := s.makeTablePath("", copyTable.Name)
if copyTable.Type != scheme.EntryTable && copyTable.Type != scheme.EntryColumnTable {
return xerrors.Errorf("only tables must be present in copy directory, found %v", copyPath)
}
if err = session.DropTable(ctx, copyPath); err != nil {
return xerrors.Errorf("failed to drop copied table %v from transfer directory: %w", copyPath, err)
}
}
return nil
})
if err != nil {
return xerrors.Errorf("failed to drop copied tables: %w", err)
}

if err = s.db.Scheme().RemoveDirectory(ctx, copyDir); err != nil {
return xerrors.Errorf("failed to remove copy directory: %w", err)
}
return nil
}

func (s *Storage) ShardTable(ctx context.Context, tableDesc abstract.TableDescription) ([]abstract.TableDescription, error) {
if !s.config.IsSnapshotSharded {
return []abstract.TableDescription{tableDesc}, nil
}

copyPath := s.makeTablePath(tableDesc.Schema, tableDesc.Name)
var result []abstract.TableDescription
err := s.db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) {
tableDescription, err := session.DescribeTable(ctx, copyPath, options.WithShardKeyBounds())
if err != nil {
return xerrors.Errorf("unable to describe table: %w", err)
}

result = make([]abstract.TableDescription, len(tableDescription.KeyRanges))
for i := range tableDescription.KeyRanges {
result[i] = tableDesc
result[i].Offset = uint64(i)
}
return nil
})

if err != nil {
return nil, xerrors.Errorf("unable to schard table %v : %w", copyPath, err)
}

return result, nil
}

func (s *Storage) makeTablePath(schema, name string) string {
tableDir := s.makeTableDir()
if !s.config.IsSnapshotSharded {
return path.Join(tableDir, schema, name)
}
return path.Join(tableDir, schema, s.modifyTableName(name))
}

func (s *Storage) makeTableDir() string {
if !s.config.IsSnapshotSharded {
return s.config.Database
}
return path.Join(s.config.Database, s.config.CopyFolder)
}
Loading

0 comments on commit b0f9613

Please sign in to comment.