Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide Watch method from Client.go and move into Attach method #593

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,19 @@ func (c *Client) Deactivate(ctx context.Context) error {

// Attach attaches the given document to this client. It tells the server that
// this client will synchronize the given document.
func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...AttachOption) error {
// If the context "ctx" is canceled or timed out, returned channel will be closed
// and "WatchResponse" from this closed channel has zero events and nil "Err()".
func (c *Client) Attach(
ctx context.Context,
doc *document.Document,
options ...AttachOption,
) (<-chan WatchResponse, error) {
if c.status != activated {
return ErrClientNotActivated
return nil, ErrClientNotActivated
}

if doc.Status() != document.StatusDetached {
return ErrDocumentNotDetached
return nil, ErrDocumentNotDetached
}

opts := &AttachOptions{}
Expand All @@ -277,12 +283,12 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
p.Initialize(opts.Presence)
return nil
}); err != nil {
return err
return nil, err
}

pbChangePack, err := converter.ToChangePack(doc.CreateChangePack())
if err != nil {
return err
return nil, err
}

res, err := c.client.AttachDocument(
Expand All @@ -293,16 +299,16 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
return err
return nil, err
}

pack, err := converter.FromChangePack(res.Msg.ChangePack)
if err != nil {
return err
return nil, err
}

if err := doc.ApplyChangePack(pack); err != nil {
return err
return nil, err
}
if c.logger.Core().Enabled(zap.DebugLevel) {
c.logger.Debug(fmt.Sprintf(
Expand All @@ -313,7 +319,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
}

if doc.Status() == document.StatusRemoved {
return nil
return nil, nil
}

doc.SetStatus(document.StatusAttached)
Expand All @@ -322,7 +328,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
docID: types.ID(res.Msg.DocumentId),
}

return nil
return c.watch(ctx, doc)
}

// Detach detaches the given document from this client. It tells the
Expand Down Expand Up @@ -406,12 +412,12 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error {
return nil
}

// Watch subscribes to events on a given documentIDs.
// watch subscribes to events on a given documentIDs.
// If an error occurs before stream initialization, the second response, error,
// is returned. If the context "ctx" is canceled or timed out, returned channel
// is closed, and "WatchResponse" from this closed channel has zero events and
// nil "Err()".
func (c *Client) Watch(
func (c *Client) watch(
ctx context.Context,
doc *document.Document,
) (<-chan WatchResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
events: make(chan DocEvent, 1),
events: make(chan DocEvent, 3),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for setting buffer size to 3?

Same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for changing buffer to 3?

Same here.

}
}

Expand Down
19 changes: 9 additions & 10 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d1 := document.New("doc1")
err = cli.Attach(ctx, d1)
_, err = cli.Attach(ctx, d1)
assert.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -202,8 +202,9 @@ func BenchmarkRPC(b *testing.B) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(b))
err := c1.Attach(ctx, d1)
rch1, err := c1.Attach(ctx, d1)
assert.NoError(b, err)
assert.NotNil(b, rch1)
testKey1 := "testKey1"
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewText(testKey1)
Expand All @@ -212,7 +213,8 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d2 := document.New(helper.TestDocKey(b))
err = c2.Attach(ctx, d2)
rch2, err := c2.Attach(ctx, d2)
assert.NotNil(b, rch2)
assert.NoError(b, err)
testKey2 := "testKey2"
err = d2.Update(func(root *json.Object, p *presence.Presence) error {
Expand All @@ -221,11 +223,6 @@ func BenchmarkRPC(b *testing.B) {
})
assert.NoError(b, err)

rch1, err := c1.Watch(ctx, d1)
assert.NoError(b, err)
rch2, err := c2.Watch(ctx, d2)
assert.NoError(b, err)

done1 := make(chan bool)
done2 := make(chan bool)

Expand Down Expand Up @@ -286,13 +283,15 @@ func BenchmarkRPC(b *testing.B) {
wg.Add(2)
go func() {
defer wg.Done()
err := c1.Attach(ctx, doc1)
rch1, err := c1.Attach(ctx, doc1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is necessary to receive and check rch1 channel every time we perform Attach(), even if the test does not require rch channel.

Maybe setting rch1 to _ would be better.

If you think this suggestion is reasonable, please check other test codes changes that you have made to apply this suggestion.

assert.NoError(b, err)
assert.NotNil(b, rch1)
}()
go func() {
defer wg.Done()
err := c2.Attach(ctx, doc2)
rch2, err := c2.Attach(ctx, doc2)
assert.NoError(b, err)
assert.NotNil(b, rch2)
}()
wg.Wait()
}()
Expand Down
14 changes: 9 additions & 5 deletions test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err))

// 02. client creates a document then admin removes the document.
assert.NoError(t, cli.Attach(ctx, d1))
rch, err := cli.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", d1.Key().String(), true)
assert.NoError(t, err)
assert.Equal(t, document.StatusAttached, d1.Status())
Expand All @@ -88,11 +90,11 @@ func TestAdmin(t *testing.T) {

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(watchCtx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
wg := sync.WaitGroup{}
wg.Add(1)
rch, err := c1.Watch(watchCtx, d1)
assert.NoError(t, err)
go func() {
defer wg.Done()

Expand Down Expand Up @@ -137,7 +139,9 @@ func TestAdmin(t *testing.T) {
assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err))

// 02. try to remove document that is attached by the client.
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)
err = adminCli.RemoveDocument(ctx, "default", doc.Key().String(), false)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, document.StatusAttached, doc.Status())
Expand Down
38 changes: 26 additions & 12 deletions test/integration/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
t.Run("causal nested array test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").
Expand All @@ -53,17 +54,19 @@
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}})
})

t.Run("concurrent array add/delete simple test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2")
Expand All @@ -75,8 +78,9 @@
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -96,8 +100,9 @@
t.Run("concurrent array add/delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1")
Expand All @@ -108,8 +113,9 @@
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").AddString("v2", "v3")
Expand All @@ -130,8 +136,9 @@
t.Run("concurrent array delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddString("v1", "v2", "v3")
Expand All @@ -142,8 +149,9 @@
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("k1").Delete(1)
Expand All @@ -169,8 +177,9 @@
t.Run("concurrent array move test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
err := c1.Attach(ctx, d1)
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
Expand All @@ -182,8 +191,9 @@
assert.NoError(t, err)

d2 := document.New(helper.TestDocKey(t))
err = c2.Attach(ctx, d2)
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

err = d1.Update(func(root *json.Object, p *presence.Presence) error {
prev := root.GetArray("k1").Get(0)
Expand All @@ -209,15 +219,19 @@
t.Run("concurrent array move with the same position test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
assert.NotNil(t, rch)
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1").AddInteger(0, 1, 2)
assert.Equal(t, `{"k1":[0,1,2]}`, root.Marshal())
return nil
}))
assert.NoError(t, c1.Sync(ctx))
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
assert.NotNil(t, rch)

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
next := root.GetArray("k1").Get(0)
Expand All @@ -241,7 +255,7 @@
t.Run("array.set with value add, delete test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))

Check failure on line 258 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c1.Attach(ctx, d1) (value of type (<-chan client.WatchResponse, error)) in single-value context

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
// 01. set array with value
Expand Down Expand Up @@ -271,9 +285,9 @@
t.Run("array.set with value sync test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))

Check failure on line 288 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c1.Attach(ctx, d1) (value of type (<-chan client.WatchResponse, error)) in single-value context
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))

Check failure on line 290 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c2.Attach(ctx, d2) (value of type (<-chan client.WatchResponse, error)) in single-value context

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("k1", []interface{}{0, 1, 2})
Expand All @@ -295,7 +309,7 @@
t.Run("array.set with Counter, Text, Tree slice test", func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))

Check failure on line 312 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c1.Attach(ctx, d1) (value of type (<-chan client.WatchResponse, error)) in single-value context

cnt := json.NewCounter(0, crdt.LongCnt)
txt := json.NewText()
Expand Down Expand Up @@ -366,7 +380,7 @@
t.Run(tt.caseName, func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))

Check failure on line 383 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c1.Attach(ctx, d1) (value of type (<-chan client.WatchResponse, error)) in single-value context

val := func() {
d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down Expand Up @@ -452,7 +466,7 @@
t.Run(tt.caseName, func(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))

Check failure on line 469 in test/integration/array_test.go

View workflow job for this annotation

GitHub Actions / build

multiple-value c1.Attach(ctx, d1) (value of type (<-chan client.WatchResponse, error)) in single-value context

err := d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("array", tt.in)
Expand Down
13 changes: 9 additions & 4 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ func TestClient(t *testing.T) {

// 01. c1, c2, c3 attach to the same document.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
_, err := c1.Attach(ctx, d1)
assert.NoError(t, err)
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
_, err = c2.Attach(ctx, d2)
assert.NoError(t, err)
d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
_, err = c3.Attach(ctx, d3)
assert.NoError(t, err)

// 02. c1, c2 sync with push-pull mode.
assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down Expand Up @@ -136,7 +139,9 @@ func TestClient(t *testing.T) {
// 01. cli attach to the same document having counter.
ctx := context.Background()
doc := document.New(helper.TestDocKey(t))
assert.NoError(t, cli.Attach(ctx, doc))
rch, err := cli.Attach(ctx, doc)
assert.NoError(t, err)
assert.NotNil(t, rch)

// 02. cli update the document with creating a counter
// and sync with push-pull mode: CP(1, 1) -> CP(2, 2)
Expand Down
Loading
Loading