Skip to content

Commit

Permalink
sync2: eliminate unneeded DB accesses when peers are in sync (#6576)
Browse files Browse the repository at this point in the history
## Motivation

`FPTree`, and thus `DBSet` hold enough information in memory for peers to decide whether they are in sync or not. In fully synced state, the peers only exchange probe requests that cover the whole set, and we only need to know fingerprint and count of the set. For that, no database queries are necessary.
  • Loading branch information
ivan4th committed Jan 3, 2025
1 parent 865d114 commit c8752c5
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 347 deletions.
40 changes: 20 additions & 20 deletions sync2/dbset/dbset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbset

import (
"context"
"errors"
"fmt"
"maps"
"sync"
Expand Down Expand Up @@ -118,16 +119,12 @@ func (d *DBSet) Receive(k rangesync.KeyBytes) error {
return nil
}

func (d *DBSet) firstItem() (rangesync.KeyBytes, error) {
if err := d.EnsureLoaded(); err != nil {
return nil, err
}
return d.ft.All().First()
}

// GetRangeInfo returns information about the range of items in the DBSet.
// RangeInfo returns information about the range of items in the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) GetRangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, error) {
func (d *DBSet) RangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, error) {
if x == nil || y == nil {
return rangesync.RangeInfo{}, errors.New("bad range")
}
if err := d.EnsureLoaded(); err != nil {
return rangesync.RangeInfo{}, err
}
Expand All @@ -136,17 +133,6 @@ func (d *DBSet) GetRangeInfo(x, y rangesync.KeyBytes) (rangesync.RangeInfo, erro
Items: rangesync.EmptySeqResult(),
}, nil
}
if x == nil || y == nil {
if x != nil || y != nil {
panic("BUG: GetRangeInfo called with one of x/y nil but not both")
}
var err error
x, err = d.firstItem()
if err != nil {
return rangesync.RangeInfo{}, fmt.Errorf("getting first item: %w", err)
}
y = x
}
fpr, err := d.ft.FingerprintInterval(x, y, -1)
if err != nil {
return rangesync.RangeInfo{}, err
Expand Down Expand Up @@ -192,6 +178,20 @@ func (d *DBSet) SplitRange(x, y rangesync.KeyBytes, count int) (rangesync.SplitI
}, nil
}

// SetInfo returns RangeInfo for the whole DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) SetInfo() (rangesync.RangeInfo, error) {
if err := d.EnsureLoaded(); err != nil {
return rangesync.RangeInfo{}, err
}
fpr := d.ft.FingerprintAll()
return rangesync.RangeInfo{
Fingerprint: fpr.FP,
Count: int(fpr.Count),
Items: fpr.Items,
}, nil
}

// Items returns a sequence of all items in the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) Items() rangesync.SeqResult {
Expand Down
214 changes: 123 additions & 91 deletions sync2/dbset/dbset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ func TestDBSet_Empty(t *testing.T) {
requireEmpty(t, s.Items())
requireEmpty(t, s.Received())

info, err := s.GetRangeInfo(nil, nil)
info, err := s.SetInfo()
require.NoError(t, err)
require.Equal(t, 0, info.Count)
require.Equal(t, "000000000000000000000000", info.Fingerprint.String())
requireEmpty(t, info.Items)

info, err = s.GetRangeInfo(
info, err = s.RangeInfo(
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
require.Equal(t, 0, info.Count)
require.Equal(t, "000000000000000000000000", info.Fingerprint.String())
requireEmpty(t, info.Items)

info, err = s.GetRangeInfo(
info, err = s.RangeInfo(
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("9999000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
Expand All @@ -69,7 +69,7 @@ func TestDBSet_Empty(t *testing.T) {

func TestDBSet(t *testing.T) {
ids := []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes("0000000000000000000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("1111111111111111111111111111111111111111111111111111111111111111"),
rangesync.MustParseHexKeyBytes("123456789abcdef0000000000000000000000000000000000000000000000000"),
rangesync.MustParseHexKeyBytes("5555555555555555555555555555555555555555555555555555555555555555"),
rangesync.MustParseHexKeyBytes("8888888888888888888888888888888888888888888888888888888888888888"),
Expand All @@ -81,94 +81,126 @@ func TestDBSet(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
require.Equal(t, "1111111111111111111111111111111111111111111111111111111111111111",
firstKey(t, s.Items()).String())
has, err := s.Has(
rangesync.MustParseHexKeyBytes("9876000000000000000000000000000000000000000000000000000000000000"))
require.NoError(t, err)
require.False(t, has)

for _, tc := range []struct {
xIdx, yIdx int
limit int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 1,
yIdx: 1,
limit: -1,
fp: "642464b773377bbddddddddd",
count: 5,
startIdx: 1,
endIdx: 1,
},
{
xIdx: -1,
yIdx: -1,
limit: -1,
fp: "642464b773377bbddddddddd",
count: 5,
startIdx: 0,
endIdx: 0,
},
{
xIdx: 0,
yIdx: 3,
limit: -1,
fp: "4761032dcfe98ba555555555",
count: 3,
startIdx: 0,
endIdx: 3,
},
{
xIdx: 2,
yIdx: 0,
limit: -1,
fp: "761032cfe98ba54ddddddddd",
count: 3,
startIdx: 2,
endIdx: 0,
},
{
xIdx: 3,
yIdx: 2,
limit: 3,
fp: "2345679abcdef01888888888",
count: 3,
startIdx: 3,
endIdx: 1,
},
} {
name := fmt.Sprintf("%d-%d_%d", tc.xIdx, tc.yIdx, tc.limit)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
if tc.xIdx >= 0 {
t.Run("RangeInfo", func(t *testing.T) {
for _, tc := range []struct {
xIdx, yIdx int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 1,
yIdx: 1,
fp: "753575a662266aaccccccccc",
count: 5,
startIdx: 1,
endIdx: 1,
},
{
xIdx: 0,
yIdx: 3,
fp: "5670123cdef89ab444444444",
count: 3,
startIdx: 0,
endIdx: 3,
},
{
xIdx: 2,
yIdx: 0,
fp: "761032cfe98ba54ddddddddd",
count: 3,
startIdx: 2,
endIdx: 0,
},
} {
name := fmt.Sprintf("%d-%d", tc.xIdx, tc.yIdx)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
x = ids[tc.xIdx]
y = ids[tc.yIdx]
}
t.Logf("x %v y %v limit %d", x, y, tc.limit)
var info rangesync.RangeInfo
if tc.limit < 0 {
info, err = s.GetRangeInfo(x, y)
t.Logf("x %v y %v", x, y)
var info rangesync.RangeInfo
info, err = s.RangeInfo(x, y)
require.NoError(t, err)
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
} else {
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
})

t.Run("SplitRange", func(t *testing.T) {
for _, tc := range []struct {
xIdx, yIdx int
limit int
fp string
count int
startIdx, endIdx int
}{
{
xIdx: 3,
yIdx: 2,
limit: 3,
fp: "3254768badcfe10999999999",
count: 3,
startIdx: 3,
endIdx: 1,
},
{
xIdx: 3,
yIdx: 3,
limit: 2,
fp: "2345679abcdef01888888888",
count: 2,
startIdx: 3,
endIdx: 3,
},
} {
name := fmt.Sprintf("%d-%d_%d", tc.xIdx, tc.yIdx, tc.limit)
t.Run(name, func(t *testing.T) {
var x, y rangesync.KeyBytes
x = ids[tc.xIdx]
y = ids[tc.yIdx]
t.Logf("x %v y %v limit %d", x, y, tc.limit)
var info rangesync.RangeInfo
sr, err := s.SplitRange(x, y, tc.limit)
require.NoError(t, err)
info = sr.Parts[0]
}
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
require.Equal(t, tc.count, info.Count)
require.Equal(t, tc.fp, info.Fingerprint.String())
require.Equal(t, ids[tc.startIdx], firstKey(t, info.Items))
has, err := s.Has(ids[tc.startIdx])
require.NoError(t, err)
require.True(t, has)
has, err = s.Has(ids[tc.endIdx])
require.NoError(t, err)
require.True(t, has)
})
}
})

t.Run("SetInfo", func(t *testing.T) {
info, err := s.SetInfo()
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "753575a662266aaccccccccc", info.Fingerprint.String())
items, err := info.Items.Collect()
require.NoError(t, err)
require.Equal(t, ids, items)
})
}

func TestDBSet_Receive(t *testing.T) {
Expand All @@ -195,7 +227,7 @@ func TestDBSet_Receive(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)

info, err := s.GetRangeInfo(ids[2], ids[0])
info, err := s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand All @@ -218,7 +250,7 @@ func TestDBSet_Copy(t *testing.T) {
firstKey(t, s.Items()).String())

require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[2], ids[0])
info, err := copy.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand All @@ -228,15 +260,15 @@ func TestDBSet_Copy(t *testing.T) {
"abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
info, err = s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

requireEmpty(t, s.Received())

info, err = s.GetRangeInfo(ids[2], ids[0])
info, err = s.RangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
Expand Down Expand Up @@ -267,13 +299,13 @@ func TestDBItemStore_Advance(t *testing.T) {
require.NoError(t, os.EnsureLoaded())

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := os.GetRangeInfo(ids[0], ids[0])
info, err := os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
Expand All @@ -284,27 +316,27 @@ func TestDBItemStore_Advance(t *testing.T) {
"abcdef1234567890000000000000000000000000000000000000000000000000"),
})

info, err = os.GetRangeInfo(ids[0], ids[0])
info, err = os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

require.NoError(t, os.Advance())

info, err = os.GetRangeInfo(ids[0], ids[0])
info, err = os.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
info, err = copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
Expand All @@ -314,7 +346,7 @@ func TestDBItemStore_Advance(t *testing.T) {
}))

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[0], ids[0])
info, err := copy.RangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
Expand Down
Loading

0 comments on commit c8752c5

Please sign in to comment.