Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing GC Using Version Vector #981

Merged
merged 62 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
78754d5
Define versionvector table in mondo/memdb
JOOHOJANG Aug 17, 2024
c4d82aa
Define database.VersionVector
JOOHOJANG Aug 18, 2024
1bd6833
Add Min function to calculate min version vector
JOOHOJANG Aug 18, 2024
c51a727
Replace unnecessary min version vector calculation logic to new
JOOHOJANG Aug 18, 2024
45817d6
Modify SyncClocks/SetClock logic
JOOHOJANG Aug 21, 2024
6c3ef13
Add Max func for version vector
JOOHOJANG Aug 21, 2024
42ff430
Add MinSyncedVersionVector into pack
JOOHOJANG Aug 23, 2024
b414f99
Add concurrent garbage collection TC which includes pushonly mode
JOOHOJANG Aug 23, 2024
af0ee42
Temporarily commented out some of memdb codes to run test
JOOHOJANG Aug 26, 2024
876e40f
Change version vector update logic and function declaration
JOOHOJANG Aug 26, 2024
9df869d
Modify VersionVector `After` method
JOOHOJANG Aug 26, 2024
95cf727
Fix inaccurate concurrent GC implementation
JOOHOJANG Aug 26, 2024
f0a3d98
Implement concurrent GC test includes pushonly mode
JOOHOJANG Aug 26, 2024
7b8729b
Define `EqualToOrAfter` at VersionVector and add todo comments to som…
JOOHOJANG Aug 28, 2024
24b70f8
Add exception handling in VersionVector
JOOHOJANG Aug 28, 2024
a769b73
Add exception handling in Version Vector (mongo)
JOOHOJANG Aug 28, 2024
99403b5
Modify VersionVector update logic when find min synced one
JOOHOJANG Aug 28, 2024
045d4d5
Update TC
JOOHOJANG Aug 28, 2024
35a99bc
Add comments to explain codes
JOOHOJANG Aug 28, 2024
7117137
Modify GC test to reflect current GC algorithm
JOOHOJANG Aug 28, 2024
93a79b9
Modify GC test to reflect current GC algorithm
JOOHOJANG Aug 28, 2024
da6a012
Modify GC test to reflect current GC algorithm
JOOHOJANG Aug 29, 2024
d205834
Add exception handling and comments
JOOHOJANG Aug 29, 2024
fc8bd6c
Add version vector delete logic when detach
JOOHOJANG Sep 1, 2024
2624ed7
Consider the detach case and modify the logic to compute the min vers…
JOOHOJANG Sep 1, 2024
8a2ccd1
Define Filter, Keys methods to Version Vector
JOOHOJANG Sep 1, 2024
7706e58
Add logic which remove detached client's lamport from local version v…
JOOHOJANG Sep 1, 2024
f20f651
Enhance the logic for finding the minimum version vector
JOOHOJANG Sep 1, 2024
f5ef0ef
Modify GC test to reflect current GC algorithm
JOOHOJANG Sep 1, 2024
850d2fe
Add logic which remove detached client's lamport from local version v…
JOOHOJANG Sep 1, 2024
7d76298
Fix version vector test
JOOHOJANG Sep 1, 2024
c8bfc23
Fix latest version vector naming to prevent misunderstanding
JOOHOJANG Sep 2, 2024
738f1cd
Add struct case into GC test
JOOHOJANG Sep 2, 2024
875574b
Sort by keys when calling VersionVector.Marshal
JOOHOJANG Sep 2, 2024
7874a9c
Fix GC Test
JOOHOJANG Sep 2, 2024
58837b3
Remove project_id from tblVersionVector schema
JOOHOJANG Sep 3, 2024
5daf28e
Update updateVersionVector logic and remove todo comment
JOOHOJANG Sep 3, 2024
783eab4
Update find min version vector for memdb
JOOHOJANG Sep 3, 2024
ed1094b
Fix typo
JOOHOJANG Sep 3, 2024
4804a56
Update Protocol
JOOHOJANG Sep 5, 2024
c404564
Set version vector when create pack
JOOHOJANG Sep 5, 2024
0e97e0e
Comment out memdb's version vector related codes
JOOHOJANG Sep 5, 2024
cfe6af3
Update mongo version vector related logics
JOOHOJANG Sep 5, 2024
8d3adda
Update code description and related logics
JOOHOJANG Sep 5, 2024
cae01ac
Update memdb min version vector logic
JOOHOJANG Sep 5, 2024
0e9056a
Fix GC test and apply modified GC algorithm, add comments on each ste…
JOOHOJANG Sep 5, 2024
43976d6
Define version vector test helper to compare version vector at each s…
JOOHOJANG Sep 6, 2024
11d7e60
Let helper check length of vv and actorData
JOOHOJANG Sep 6, 2024
ce058cc
Fix reviewed parts
JOOHOJANG Sep 12, 2024
ada30e0
Add TC for test detached client's version remains in db
JOOHOJANG Sep 24, 2024
e34e403
Fix typo
JOOHOJANG Sep 24, 2024
f0d7096
Update server/backend/database/memory/database.go
hackerwins Sep 25, 2024
58a95fd
Update server/backend/database/memory/database.go
hackerwins Sep 25, 2024
00e9b0c
Follow db convention
JOOHOJANG Sep 25, 2024
b4cdd34
Modify version vector deletion logic not to remove detached client's …
JOOHOJANG Sep 25, 2024
ca5b137
Modify version vector related logics to use ActorID rather and actorID
JOOHOJANG Sep 25, 2024
7828d44
Update memory db version vector logics
JOOHOJANG Sep 25, 2024
91abc91
Rename actorIDs to activeActorID
JOOHOJANG Sep 25, 2024
a00128d
Revise the codes
hackerwins Sep 25, 2024
cce22d7
Modify GC Test to handle error assertion in single line of code and m…
JOOHOJANG Sep 25, 2024
059fff1
Update server/backend/database/mongo/indexes.go
hackerwins Sep 26, 2024
d8f7250
Revise the codes
hackerwins Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,25 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

versionVector, err := FromVersionVector(pbPack.VersionVector)
if err != nil {
return nil, err
}

pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
MinSyncedTicket: minSyncedTicket,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
}

if pbPack.MinSyncedVersionVector != nil {
pack.MinSyncedVersionVector, err = FromVersionVector(pbPack.MinSyncedVersionVector)
if err != nil {
return nil, err
}
}

if pbPack.Snapshot != nil {
Expand Down
6 changes: 6 additions & 0 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,18 @@ func ToChangePack(pack *change.Pack) (*api.ChangePack, error) {
return nil, err
}

pbVersionVector, err := ToVersionVector(pack.VersionVector)
if err != nil {
return nil, err
}

return &api.ChangePack{
DocumentKey: pack.DocumentKey.String(),
Checkpoint: ToCheckpoint(pack.Checkpoint),
Changes: pbChanges,
Snapshot: pack.Snapshot,
MinSyncedTicket: ToTimeTicket(pack.MinSyncedTicket),
VersionVector: pbVersionVector,
IsRemoved: pack.IsRemoved,
}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ components:
minSyncedTicket:
$ref: '#/components/schemas/yorkie.v1.TimeTicket'
additionalProperties: false
description: ""
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
Expand All @@ -280,6 +280,12 @@ components:
description: ""
title: snapshot_version_vector
type: object
versionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: version_vector
type: object
title: ChangePack
type: object
yorkie.v1.Checkpoint:
Expand Down
8 changes: 7 additions & 1 deletion api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ components:
minSyncedTicket:
$ref: '#/components/schemas/yorkie.v1.TimeTicket'
additionalProperties: false
description: ""
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
Expand All @@ -488,6 +488,12 @@ components:
description: ""
title: snapshot_version_vector
type: object
versionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: version_vector
type: object
title: ChangePack
type: object
yorkie.v1.Checkpoint:
Expand Down
1,569 changes: 791 additions & 778 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message ChangePack {
TimeTicket min_synced_ticket = 5; // Deprecated
VersionVector min_synced_version_vector = 8;
bool is_removed = 6;
VersionVector version_vector = 9;
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
}

message Change {
Expand Down
2 changes: 1 addition & 1 deletion cmd/yorkie/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func Preload(_ *cobra.Command, _ []string) error {
}

if err := viper.ReadInConfig(); err != nil {
return fmt.Errorf("failed to read in config: %w", err)
return fmt.Errorf("read in config: %w", err)
}
return nil
}
24 changes: 13 additions & 11 deletions pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func (id ID) NewTimeTicket(delimiter uint32) *time.Ticket {
func (id ID) SyncClocks(other ID) ID {
lamport := id.lamport + 1
if id.lamport < other.lamport {
lamport = other.lamport
lamport = other.lamport + 1
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector)
newID.versionVector.Set(other.actorID, other.lamport)
newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(other.versionVector))
newID.versionVector.Set(id.actorID, lamport)
return newID
}

Expand All @@ -114,16 +114,18 @@ func (id ID) SyncClocks(other ID) ID {
func (id ID) SetClocks(otherLamport int64, vector time.VersionVector) ID {
lamport := id.lamport + 1
if id.lamport < otherLamport {
lamport = otherLamport
lamport = otherLamport + 1
}

return NewID(
id.clientSeq,
id.serverSeq,
lamport,
id.actorID,
vector,
)
newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(vector))
newID.versionVector.Set(id.actorID, lamport)

return newID
}

// SetVersionVector sets version vector
func (id ID) SetVersionVector(vector time.VersionVector) ID {
return NewID(id.clientSeq, id.serverSeq, id.lamport, id.actorID, vector)
}

// SetActor sets actorID.
Expand Down
13 changes: 9 additions & 4 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Pack struct {
// Snapshot is a byte array that encode the document.
Snapshot []byte

// VersionVector is the version vector of the document.
VersionVector time.VersionVector

// SnapshotVersionVector is the version vector of the snapshot if it exists.
SnapshotVersionVector time.VersionVector

Expand All @@ -55,13 +58,15 @@ func NewPack(
key key.Key,
cp Checkpoint,
changes []*Change,
versionVector time.VersionVector,
snapshot []byte,
) *Pack {
return &Pack{
DocumentKey: key,
Checkpoint: cp,
Changes: changes,
Snapshot: snapshot,
DocumentKey: key,
Checkpoint: cp,
Changes: changes,
VersionVector: versionVector,
Snapshot: snapshot,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/document/crdt/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *Root) GarbageCollect(vector time.VersionVector) (int, error) {
count := 0

for _, pair := range r.gcElementPairMap {
if vector.After(pair.elem.RemovedAt()) {
if vector.EqualToOrAfter(pair.elem.RemovedAt()) {
if err := pair.parent.Purge(pair.elem); err != nil {
return 0, err
}
Expand All @@ -157,7 +157,7 @@ func (r *Root) GarbageCollect(vector time.VersionVector) (int, error) {
}

for _, pair := range r.gcNodePairMap {
if vector.After(pair.Child.RemovedAt()) {
if vector.EqualToOrAfter(pair.Child.RemovedAt()) {
if err := pair.Parent.Purge(pair.Child); err != nil {
return 0, err
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,17 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.GarbageCollect(pack.MinSyncedVersionVector)
}

// 05. Update the status.
// 05. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if err != nil {
return err
}

d.doc.changeID = d.doc.changeID.SetVersionVector(d.doc.changeID.VersionVector().Filter(actorIDs))
}

// 06. Update the status.
if pack.IsRemoved {
d.SetStatus(StatusRemoved)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,14 @@ func TestDocument(t *testing.T) {
docB.SetActor(actorB)
assert.Equal(t, "{}", docB.VersionVector().Marshal())
assert.NoError(t, docB.ApplyChangePack(packA))
assert.Equal(t, "{000000000000000000000001:2}", docB.VersionVector().Marshal())
assert.Equal(t, "{000000000000000000000001:2,000000000000000000000002:3}", docB.VersionVector().Marshal())

assert.NoError(t, docB.Update(func(r *json.Object, p *presence.Presence) error {
r.SetString("k2", "3")
return nil
}))
assert.Equal(t, int64(2), docB.VersionVector().VersionOf(actorA))
assert.Equal(t, int64(3), docB.VersionVector().VersionOf(actorB))
assert.Equal(t, int64(4), docB.VersionVector().VersionOf(actorB))
packB := docB.CreateChangePack()
packB.MinSyncedTicket = time.InitialTicket
assert.True(t, packB.Changes[0].AfterOrEqual(packA.Changes[1]))
Expand Down
12 changes: 11 additions & 1 deletion pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er
}
}

// 04. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if err != nil {
return err
}

d.changeID = d.changeID.SetVersionVector(d.changeID.VersionVector().Filter(actorIDs))
}

return nil
}

Expand All @@ -196,7 +206,7 @@ func (d *InternalDocument) CreateChangePack() *change.Pack {
changes := d.localChanges

cp := d.checkpoint.IncreaseClientSeq(uint32(len(changes)))
return change.NewPack(d.key, cp, changes, nil)
return change.NewPack(d.key, cp, changes, d.VersionVector(), nil)
}

// SetActor sets actor into this document. This is also applied in the local
Expand Down
Loading