Skip to content

Commit

Permalink
feat: improve datanode snapshot creation
Browse files Browse the repository at this point in the history
As of now, the snapshot are created in a sequential and blocking way in the datanode. This means
that while a snapshot is being taken, no block can be processed.

The following approach is made:
- the database is locked with a transaction
- queries are generated
- one by one the query are:
 - executed
 - the result piped into the file system
- finally the lock is released, and later the files are added to ipfs.

The bottle neck here is that the results are being save on the fs as they arrive, which is unecessary
and amount for 95% of the time spent snapshoting (and so blocking anything else).

To prevent this, we keep those results from the database in buffers, and only save them to file via a worker
go routine.

Cache buffer size in datanode snapshot.

Signed-off-by: Jeremy Letang <me@jeremyletang.com>
  • Loading branch information
jeremyletang committed Jul 3, 2024
1 parent 749eb77 commit 50fcf04
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 32 deletions.
3 changes: 2 additions & 1 deletion cmd/data-node/commands/start/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
l.snapshotService,
networkHistoryStore,
l.conf.API.Port,
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo))
l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo),
)
if err != nil {
return fmt.Errorf("failed to create networkHistory service:%w", err)
}
Expand Down
23 changes: 23 additions & 0 deletions datanode/networkhistory/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c
datanodeGrpcAPIPort: datanodeGrpcAPIPort,
}

go func() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
s.log.Info("saving network history before before leaving")
// consume all pending files
s.snapshotService.Flush()
s.log.Info("network history saved (maybe)")
return
case <-ticker.C:
s.snapshotService.Flush()
}
}
}()

if cfg.Publish {
var err error

// publish all file which are ready
go func() {
ticker := time.NewTicker(5 * time.Second)
for {
Expand Down Expand Up @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t
}
}

// empty the file worker
d.log.Info("saving network history to disk")
d.snapshotService.Flush()
d.log.Info("network history saved to disk")

if err = d.PublishSegments(ctx); err != nil {
return fmt.Errorf("failed to publish snapshots: %w", err)
}
Expand Down
47 changes: 39 additions & 8 deletions datanode/networkhistory/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestMain(t *testing.M) {
pgLog *bytes.Buffer,
) {
sqlConfig = config
log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString())
log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString())

pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot: %w", err))
}

waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, snapshotService.Flush)

snapshots = append(snapshots, ss)

Expand All @@ -211,7 +211,7 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot:%w", err))
}

waitForSnapshotToComplete(lastSnapshot)
waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush)
snapshots = append(snapshots, lastSnapshot)
md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestMain(t *testing.M) {
panic(fmt.Errorf("failed to create snapshot:%w", err))
}

waitForSnapshotToComplete(lastSnapshot)
waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush)
snapshots = append(snapshots, lastSnapshot)
md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory())
if err != nil {
Expand Down Expand Up @@ -421,6 +421,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) {
require.Equal(t, int64(1000), fetched)

networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath)

segments, err := networkhistoryService.ListAllHistorySegments()
require.NoError(t, err)

Expand Down Expand Up @@ -583,7 +584,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)

waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, snapshotService.Flush)

md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -888,7 +889,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) {
if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 {
ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

if lastCommittedBlockHeight == 4000 {
newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
Expand Down Expand Up @@ -993,7 +994,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) {
if lastCommittedBlockHeight == 3000 {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -1096,7 +1097,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven
if lastCommittedBlockHeight == 3000 {
ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
require.NoError(t, err)
waitForSnapshotToComplete(ss)
waitForSnapshotToComplete2(ss, service.Flush)

snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory())
require.NoError(t, err)
Expand Down Expand Up @@ -1578,6 +1579,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) {
}
}

func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) {
for {
time.Sleep(10 * time.Millisecond)
// wait for snapshot current file
_, err := os.Stat(sf.UnpublishedSnapshotDataDirectory())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
continue
} else {
panic(err)
}
}

flush()

// wait for snapshot data dump in progress file to be removed

_, err = os.Stat(sf.InProgressFilePath())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
break
} else {
panic(err)
}
} else {
continue
}
}
}

func decompressEventFile() {
sourceFile, err := os.Open(compressedEventsFile)
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions datanode/networkhistory/snapshot/file_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package snapshot

import (
"bytes"
"fmt"
"os"
"sync"
)

type bufAndPath struct {
buf *bytes.Buffer
isProgressFile bool
path string
}

type FileWorker struct {
mu sync.Mutex
queue []*bufAndPath
}

func NewFileWorker() *FileWorker {
return &FileWorker{
queue: []*bufAndPath{},
}
}

func (fw *FileWorker) Add(buf *bytes.Buffer, path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{buf, false, path})
}

func (fw *FileWorker) AddLockFile(path string) {
fw.mu.Lock()
defer fw.mu.Unlock()

fw.queue = append(fw.queue, &bufAndPath{nil, true, path})
}

func (fw *FileWorker) peek() (bp *bufAndPath) {
fw.mu.Lock()
defer fw.mu.Unlock()

if len(fw.queue) <= 0 {
return
}

bp, fw.queue = fw.queue[0], fw.queue[1:]

return
}

func (fw *FileWorker) Empty() bool {
fw.mu.Lock()
defer fw.mu.Unlock()

return len(fw.queue) <= 0
}

func (fw *FileWorker) Consume() error {
bp := fw.peek()
if bp == nil {
return nil // nothing to do
}

if bp.isProgressFile {
return fw.removeLockFile(bp.path)
}

return fw.writeSegment(bp)
}

func (fw *FileWorker) removeLockFile(path string) error {
return os.Remove(path)
}

func (fw *FileWorker) writeSegment(bp *bufAndPath) error {
file, err := os.Create(bp.path)
if err != nil {
return fmt.Errorf("failed to create file %s : %w", bp.path, err)
}

defer file.Close()

_, err = bp.buf.WriteTo(file)
if err != nil {
return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err)
}

return nil
}
29 changes: 21 additions & 8 deletions datanode/networkhistory/snapshot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Service struct {

historyStore HistoryStore

fw *FileWorker
tableSnapshotFileSizesCached map[string]int

createSnapshotLock mutex.CtxMutex
copyToPath string
migrateSchemaUpToVersion func(version int64) error
Expand All @@ -59,14 +62,16 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
}

s := &Service{
log: log,
config: config,
connPool: connPool,
createSnapshotLock: mutex.New(),
copyToPath: snapshotsCopyToPath,
migrateSchemaUpToVersion: migrateDatabaseToVersion,
migrateSchemaDownToVersion: migrateSchemaDownToVersion,
historyStore: historyStore,
log: log,
config: config,
connPool: connPool,
createSnapshotLock: mutex.New(),
copyToPath: snapshotsCopyToPath,
migrateSchemaUpToVersion: migrateDatabaseToVersion,
migrateSchemaDownToVersion: migrateSchemaDownToVersion,
historyStore: historyStore,
fw: NewFileWorker(),
tableSnapshotFileSizesCached: map[string]int{},
}

err = os.MkdirAll(s.copyToPath, fs.ModePerm)
Expand All @@ -77,6 +82,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po
return s, nil
}

func (b *Service) Flush() {
for !b.fw.Empty() {
if err := b.fw.Consume(); err != nil {
b.log.Error("failed to write all files to disk", logging.Error(err))
}
}
}

func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error {
_, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight)
if err != nil {
Expand Down
Loading

0 comments on commit 50fcf04

Please sign in to comment.