Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate Checkpoint in ChangePack for PushPull API requests #959

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
495a54f
Add HTTP health check handler for server health monitoring (#952)
taeng0204 Aug 7, 2024
4344fce
Implement validation method to ensure sequential ClientSeq in reqPack…
binary-ho Aug 9, 2024
c5abdcd
Add ErrClientSeqNotSequential to errorToCode Mappers in yorkie.connect
binary-ho Aug 9, 2024
62e7f3d
Write 'packs' test code for the following cases: sequential ClientSeq…
binary-ho Aug 9, 2024
2b58d1d
Fix linting issues in packs_test
binary-ho Aug 9, 2024
2c48484
Write 'packs' test code for the following cases: sequential ClientSeq…
binary-ho Aug 10, 2024
f41ea88
Merge branch 'main' into validate-checkpoint
binary-ho Aug 10, 2024
6d13dc9
Fix goimports issues in packs_test
binary-ho Aug 10, 2024
a348192
Fix goimports issues in packs_test
binary-ho Aug 10, 2024
e447471
Rewrite test description
binary-ho Aug 10, 2024
59392e0
Fix goimports issues in packs_test.go
binary-ho Aug 10, 2024
7712bb8
Fix push/pull test where ClientSeq is less than ClientInfo's ClientSeq
binary-ho Aug 10, 2024
be485a7
Fix goimports issue at health_test.go
binary-ho Aug 10, 2024
b2472f4
Rollback health_test.go
binary-ho Aug 10, 2024
511f8f1
Add Validation ClientSeq is Sequential With DocInfo.Checkpoint.ClientSeq
binary-ho Aug 11, 2024
1917a7b
Write Test Code about validate ClientSeq is sequential with DocInfo.C…
binary-ho Aug 11, 2024
38cc7ed
Fix validate ClientSeq Sequential With Checkpoint
binary-ho Aug 11, 2024
4871d92
Remove TODO comment about needs of Changes validate
binary-ho Aug 11, 2024
a32137e
Fix lint issues: some lines too long
binary-ho Aug 11, 2024
0217951
Change to return `CodeFailedPrecondition` Status Code to the client i…
binary-ho Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package packs

import (
"context"
"errors"
"fmt"
"strconv"
gotime "time"
Expand All @@ -37,6 +38,11 @@ import (
"github.com/yorkie-team/yorkie/server/logging"
)

var (
// ErrClientSeqNotSequential is returned when ClientSeq in reqPack.Changes are not sequential
ErrClientSeqNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential")
)

// PushPullKey creates a new sync.Key of PushPull for the given document.
func PushPullKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey))
Expand Down Expand Up @@ -77,6 +83,11 @@ func PushPull(

// TODO: Changes may be reordered or missing during communication on the network.
// We should check the change.pack with checkpoint to make sure the changes are in the correct order.
err := validateClientSeqSequential(reqPack.Changes)
if err != nil {
return nil, err
}

initialServerSeq := docInfo.ServerSeq

// 01. push changes: filter out the changes that are already saved in the database.
Expand Down Expand Up @@ -272,3 +283,24 @@ func BuildDocumentForServerSeq(

return doc, nil
}

func validateClientSeqSequential(changes []*change.Change) error {
if len(changes) <= 1 {
return nil
}

nextClientSeq := changes[0].ClientSeq()
for _, cn := range changes[1:] {
nextClientSeq++

if nextClientSeq != cn.ClientSeq() {
return fmt.Errorf(
"ClientSeq in Changes are not sequential (expected: %d, actual: %d) : %w",
nextClientSeq,
cn.ClientSeq(),
ErrClientSeqNotSequential,
)
}
}
return nil
}
261 changes: 261 additions & 0 deletions server/packs/packs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package packs_test

import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/clients"
"github.com/yorkie-team/yorkie/server/documents"
"github.com/yorkie-team/yorkie/server/packs"
"github.com/yorkie-team/yorkie/server/profiling/prometheus"
"github.com/yorkie-team/yorkie/server/rpc/connecthelper"
"github.com/yorkie-team/yorkie/test/helper"
)

var (
clientID = "000000000000000000000001"
)

func Test(t *testing.T) {
t.Run("push/pull sequential ClientSeq test (happy case)", func(t *testing.T) {
RunPushPullWithSequentialClientSeqTest(t)
})

t.Run("push/pull not sequential ClientSeq test", func(t *testing.T) {
RunPushPullWithNotSequentialClientSeqTest(t)
})

t.Run("push/pull ClientSeq less than ClientInfo's ClientSeq (duplicated request)", func(t *testing.T) {
RunPushPullWithClientSeqLessThanClientInfoTest(t)
})

t.Run("push/pull ServerSeq greater than DocInfo's ServerSeq", func(t *testing.T) {
RunPushPullWithServerSeqGreaterThanDocInfoTest(t)
})
}

func RunPushPullWithSequentialClientSeqTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)
actorID, _ := time.ActorIDFromHex(clientID)

changePackWithSequentialClientSeq, _ :=
createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true)
err := clientInfo.AttachDocument(
docInfo.ID, changePackWithSequentialClientSeq.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(
ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithSequentialClientSeq, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(t, err)
}

func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackWithNotSequentialClientSeq, _ :=
createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo,
changePackWithNotSequentialClientSeq.DocumentKey, true)
err := clientInfo.AttachDocument(
docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(
ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithNotSequentialClientSeq, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err))
}

func RunPushPullWithClientSeqLessThanClientInfoTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackFixture, _ :=
createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackFixture.DocumentKey, true)
err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(ctx, be, project.ToProject(),
clientInfo, docInfo, changePackFixture, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
if err != nil {
assert.Fail(t, "failed to push pull")
}

changePackWithClientSeqLessThanClientInfo, _ :=
createChangePackWithClientSeqLessThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes())
_, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithClientSeqLessThanClientInfo, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

assert.NoError(t, err)
}

func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackFixture, _ :=
createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackFixture.DocumentKey, true)
err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, _ = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo,
changePackFixture, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

changePackWithServerSeqGreaterThanDocInfo, _ :=
createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String())

_, err = packs.PushPull(ctx, be, project.ToProject(),
clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

assert.Equal(t, connecthelper.CodeOf(packs.ErrInvalidServerSeq), connecthelper.CodeOf(err))
}

func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2},
Changes: []*api.Change{
createChange(0, 0, actorID),
createChange(1, 1, actorID),
createChange(2, 2, actorID),
},
})
}

func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0},
Changes: []*api.Change{
createChange(2, 2, actorID),
createChange(1, 1, actorID),
createChange(0, 0, actorID),
},
})
}

func createChangePackWithClientSeqLessThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 0},
Changes: []*api.Change{
createChange(0, 0, actorID),
},
})
}

func createChangePackFixture(documentKey string, actorID []byte) (*change.Pack, error) {
return createChangePackWithSequentialClientSeq(documentKey, actorID)
}

func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 2},
})
}

func createChange(clientSeq uint32, lamport int64, actorID []byte) *api.Change {
return &api.Change{
Id: &api.ChangeID{
ClientSeq: clientSeq,
Lamport: lamport,
ActorId: actorID,
},
}
}

func setUpBackend(
t *testing.T,
) *backend.Backend {
conf := helper.TestConfig()

metrics, err := prometheus.NewMetrics()
assert.NoError(t, err)

be, err := backend.New(
conf.Backend,
conf.Mongo,
conf.Housekeeping,
metrics,
)
assert.NoError(t, err)

return be
}
2 changes: 2 additions & 0 deletions server/rpc/connecthelper/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var errorToConnectCode = map[error]connect.Code{
clients.ErrInvalidClientKey: connect.CodeInvalidArgument,
key.ErrInvalidKey: connect.CodeInvalidArgument,
types.ErrEmptyProjectFields: connect.CodeInvalidArgument,
packs.ErrClientSeqNotSequential: connect.CodeInvalidArgument,

// NotFound means the requested resource does not exist.
database.ErrProjectNotFound: connect.CodeNotFound,
Expand Down Expand Up @@ -100,6 +101,7 @@ var errorToCode = map[error]string{
clients.ErrInvalidClientKey: "ErrInvalidClientKey",
key.ErrInvalidKey: "ErrInvalidKey",
types.ErrEmptyProjectFields: "ErrEmptyProjectFields",
packs.ErrClientSeqNotSequential: "ErrClientSeqNotSequential",

database.ErrProjectNotFound: "ErrProjectNotFound",
database.ErrClientNotFound: "ErrClientNotFound",
Expand Down
Loading