diff --git a/client/client.go b/client/client.go index f1673a9ac..c1d2d8c4a 100644 --- a/client/client.go +++ b/client/client.go @@ -257,13 +257,19 @@ func (c *Client) Deactivate(ctx context.Context) error { // Attach attaches the given document to this client. It tells the server that // this client will synchronize the given document. -func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...AttachOption) error { +// If the context "ctx" is canceled or timed out, returned channel will be closed +// and "WatchResponse" from this closed channel has zero events and nil "Err()". +func (c *Client) Attach( + ctx context.Context, + doc *document.Document, + options ...AttachOption, +) (<-chan WatchResponse, error) { if c.status != activated { - return ErrClientNotActivated + return nil, ErrClientNotActivated } if doc.Status() != document.StatusDetached { - return ErrDocumentNotDetached + return nil, ErrDocumentNotDetached } opts := &AttachOptions{} @@ -277,12 +283,12 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... p.Initialize(opts.Presence) return nil }); err != nil { - return err + return nil, err } pbChangePack, err := converter.ToChangePack(doc.CreateChangePack()) if err != nil { - return err + return nil, err } res, err := c.client.AttachDocument( @@ -293,16 +299,16 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... }, ), c.options.APIKey, doc.Key().String())) if err != nil { - return err + return nil, err } pack, err := converter.FromChangePack(res.Msg.ChangePack) if err != nil { - return err + return nil, err } if err := doc.ApplyChangePack(pack); err != nil { - return err + return nil, err } if c.logger.Core().Enabled(zap.DebugLevel) { c.logger.Debug(fmt.Sprintf( @@ -313,7 +319,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... } if doc.Status() == document.StatusRemoved { - return nil + return nil, nil } doc.SetStatus(document.StatusAttached) @@ -322,7 +328,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ... docID: types.ID(res.Msg.DocumentId), } - return nil + return c.watch(ctx, doc) } // Detach detaches the given document from this client. It tells the @@ -406,12 +412,12 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error { return nil } -// Watch subscribes to events on a given documentIDs. +// watch subscribes to events on a given documentIDs. // If an error occurs before stream initialization, the second response, error, // is returned. If the context "ctx" is canceled or timed out, returned channel // is closed, and "WatchResponse" from this closed channel has zero events and // nil "Err()". -func (c *Client) Watch( +func (c *Client) watch( ctx context.Context, doc *document.Document, ) (<-chan WatchResponse, error) { diff --git a/server/backend/sync/pubsub.go b/server/backend/sync/pubsub.go index cf9547f27..8ba605d7d 100644 --- a/server/backend/sync/pubsub.go +++ b/server/backend/sync/pubsub.go @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription { return &Subscription{ id: xid.New().String(), subscriber: subscriber, - events: make(chan DocEvent, 1), + events: make(chan DocEvent, 3), } } diff --git a/test/bench/grpc_bench_test.go b/test/bench/grpc_bench_test.go index 39bb924ca..87ed86104 100644 --- a/test/bench/grpc_bench_test.go +++ b/test/bench/grpc_bench_test.go @@ -179,7 +179,7 @@ func BenchmarkRPC(b *testing.B) { assert.NoError(b, err) d1 := document.New("doc1") - err = cli.Attach(ctx, d1) + _, err = cli.Attach(ctx, d1) assert.NoError(b, err) for i := 0; i < b.N; i++ { @@ -202,8 +202,9 @@ func BenchmarkRPC(b *testing.B) { ctx := context.Background() d1 := document.New(helper.TestDocKey(b)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(b, err) + assert.NotNil(b, rch1) testKey1 := "testKey1" err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText(testKey1) @@ -212,7 +213,8 @@ func BenchmarkRPC(b *testing.B) { assert.NoError(b, err) d2 := document.New(helper.TestDocKey(b)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) + assert.NotNil(b, rch2) assert.NoError(b, err) testKey2 := "testKey2" err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -221,11 +223,6 @@ func BenchmarkRPC(b *testing.B) { }) assert.NoError(b, err) - rch1, err := c1.Watch(ctx, d1) - assert.NoError(b, err) - rch2, err := c2.Watch(ctx, d2) - assert.NoError(b, err) - done1 := make(chan bool) done2 := make(chan bool) @@ -286,13 +283,15 @@ func BenchmarkRPC(b *testing.B) { wg.Add(2) go func() { defer wg.Done() - err := c1.Attach(ctx, doc1) + rch1, err := c1.Attach(ctx, doc1) assert.NoError(b, err) + assert.NotNil(b, rch1) }() go func() { defer wg.Done() - err := c2.Attach(ctx, doc2) + rch2, err := c2.Attach(ctx, doc2) assert.NoError(b, err) + assert.NotNil(b, rch2) }() wg.Wait() }() diff --git a/test/integration/admin_test.go b/test/integration/admin_test.go index b02fa8748..5840fc574 100644 --- a/test/integration/admin_test.go +++ b/test/integration/admin_test.go @@ -67,7 +67,9 @@ func TestAdmin(t *testing.T) { assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) // 02. client creates a document then admin removes the document. - assert.NoError(t, cli.Attach(ctx, d1)) + rch, err := cli.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) err = adminCli.RemoveDocument(ctx, "default", d1.Key().String(), true) assert.NoError(t, err) assert.Equal(t, document.StatusAttached, d1.Status()) @@ -88,11 +90,11 @@ func TestAdmin(t *testing.T) { // 01. c1 attaches and watches d1. d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + rch, err := c1.Attach(watchCtx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) wg := sync.WaitGroup{} wg.Add(1) - rch, err := c1.Watch(watchCtx, d1) - assert.NoError(t, err) go func() { defer wg.Done() @@ -137,7 +139,9 @@ func TestAdmin(t *testing.T) { assert.Equal(t, connect.CodeNotFound, connect.CodeOf(err)) // 02. try to remove document that is attached by the client. - assert.NoError(t, cli.Attach(ctx, doc)) + rch, err := cli.Attach(ctx, doc) + assert.NoError(t, err) + assert.NotNil(t, rch) err = adminCli.RemoveDocument(ctx, "default", doc.Key().String(), false) assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) assert.Equal(t, document.StatusAttached, doc.Status()) diff --git a/test/integration/array_test.go b/test/integration/array_test.go index f4935ef8a..ebabb7732 100644 --- a/test/integration/array_test.go +++ b/test/integration/array_test.go @@ -41,8 +41,9 @@ func TestArray(t *testing.T) { t.Run("causal nested array test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1"). @@ -53,8 +54,9 @@ func TestArray(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}}) }) @@ -62,8 +64,9 @@ func TestArray(t *testing.T) { t.Run("concurrent array add/delete simple test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1").AddString("v1", "v2") @@ -75,8 +78,9 @@ func TestArray(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetArray("k1").Delete(1) @@ -96,8 +100,9 @@ func TestArray(t *testing.T) { t.Run("concurrent array add/delete test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1").AddString("v1") @@ -108,8 +113,9 @@ func TestArray(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetArray("k1").AddString("v2", "v3") @@ -130,8 +136,9 @@ func TestArray(t *testing.T) { t.Run("concurrent array delete test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1").AddString("v1", "v2", "v3") @@ -142,8 +149,9 @@ func TestArray(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetArray("k1").Delete(1) @@ -169,8 +177,9 @@ func TestArray(t *testing.T) { t.Run("concurrent array move test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1").AddInteger(0, 1, 2) @@ -182,8 +191,9 @@ func TestArray(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { prev := root.GetArray("k1").Get(0) @@ -209,7 +219,9 @@ func TestArray(t *testing.T) { t.Run("concurrent array move with the same position test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + rch, err := c1.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewArray("k1").AddInteger(0, 1, 2) assert.Equal(t, `{"k1":[0,1,2]}`, root.Marshal()) @@ -217,7 +229,9 @@ func TestArray(t *testing.T) { })) assert.NoError(t, c1.Sync(ctx)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + rch, err = c2.Attach(ctx, d2) + assert.NoError(t, err) + assert.NotNil(t, rch) assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { next := root.GetArray("k1").Get(0) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index e37fe4413..5891afd02 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -82,11 +82,14 @@ func TestClient(t *testing.T) { // 01. c1, c2, c3 attach to the same document. d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + _, err := c1.Attach(ctx, d1) + assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + _, err = c2.Attach(ctx, d2) + assert.NoError(t, err) d3 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c3.Attach(ctx, d3)) + _, err = c3.Attach(ctx, d3) + assert.NoError(t, err) // 02. c1, c2 sync with push-pull mode. assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -136,7 +139,9 @@ func TestClient(t *testing.T) { // 01. cli attach to the same document having counter. ctx := context.Background() doc := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli.Attach(ctx, doc)) + rch, err := cli.Attach(ctx, doc) + assert.NoError(t, err) + assert.NotNil(t, rch) // 02. cli update the document with creating a counter // and sync with push-pull mode: CP(1, 1) -> CP(2, 2) diff --git a/test/integration/counter_test.go b/test/integration/counter_test.go index 4971c88e5..9a25af7df 100644 --- a/test/integration/counter_test.go +++ b/test/integration/counter_test.go @@ -40,8 +40,9 @@ func TestCounter(t *testing.T) { t.Run("causal counter.increase test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewCounter("age", crdt.LongCnt, 1). @@ -55,8 +56,9 @@ func TestCounter(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}}) }) @@ -64,8 +66,9 @@ func TestCounter(t *testing.T) { t.Run("concurrent counter increase test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewCounter("age", crdt.IntegerCnt, 0) @@ -78,8 +81,9 @@ func TestCounter(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetCounter("age"). diff --git a/test/integration/gc_test.go b/test/integration/gc_test.go index 0c972d5bf..6ca04d00d 100644 --- a/test/integration/gc_test.go +++ b/test/integration/gc_test.go @@ -39,12 +39,14 @@ func TestGarbageCollection(t *testing.T) { t.Run("garbage collection for container type test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetInteger("1", 1) @@ -106,12 +108,14 @@ func TestGarbageCollection(t *testing.T) { t.Run("garbage collection for text type test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("text"). @@ -246,12 +250,14 @@ func TestGarbageCollection(t *testing.T) { t.Run("garbage collection for tree type test (multi clients)", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewTree("t", &json.TreeNode{ @@ -324,12 +330,14 @@ func TestGarbageCollection(t *testing.T) { t.Run("garbage collection with detached document test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetInteger("1", 1) diff --git a/test/integration/history_test.go b/test/integration/history_test.go index 52c059518..2cacf1706 100644 --- a/test/integration/history_test.go +++ b/test/integration/history_test.go @@ -41,7 +41,9 @@ func TestHistory(t *testing.T) { t.Run("history test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli.Attach(ctx, d1)) + rch, err := cli.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) defer func() { assert.NoError(t, cli.Detach(ctx, d1)) }() assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { diff --git a/test/integration/object_test.go b/test/integration/object_test.go index 374953c76..e49d42340 100644 --- a/test/integration/object_test.go +++ b/test/integration/object_test.go @@ -42,12 +42,14 @@ func TestObject(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewObject("k1"). @@ -75,8 +77,9 @@ func TestObject(t *testing.T) { t.Run("concurrent object set/delete simple test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewObject("k1") @@ -88,8 +91,9 @@ func TestObject(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.Delete("k1") @@ -112,12 +116,14 @@ func TestObject(t *testing.T) { t.Run("concurrent object.set test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch, err = c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch) // 01. concurrent set on same key err = d1.Update(func(root *json.Object, p *presence.Presence) error { diff --git a/test/integration/presence_test.go b/test/integration/presence_test.go index e2b37aa69..3428b4e8e 100644 --- a/test/integration/presence_test.go +++ b/test/integration/presence_test.go @@ -46,11 +46,21 @@ func TestPresence(t *testing.T) { // 01. Create a document and attach it to the clients ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) - defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() + rch, err := c1.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) + + defer func() { + assert.NoError(t, c1.Detach(ctx, d1)) + }() + d2 := document.New(helper.TestDocKey(t), 2) + rch2, err := c2.Attach(ctx, d2) + + assert.NoError(t, err) + assert.NotNil(t, rch2) + defer func() { + assert.NoError(t, c2.Detach(ctx, d2)) + }() // 02. Update the root of the document and presence assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -74,10 +84,14 @@ func TestPresence(t *testing.T) { // 01. Create a document and attach it to the clients ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) + rch1, err := c1.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch1) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2)) + rch2, err := c2.Attach(ctx, d2) + assert.NoError(t, err) + assert.NotNil(t, rch2) defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() // 02. Update the root of the document and presence @@ -104,9 +118,13 @@ func TestPresence(t *testing.T) { // 01. Create a document and attach it to the clients ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1, client.WithPresence(innerpresence.Presence{"key": c1.Key()}))) + rch1, err := c1.Attach(ctx, d1, client.WithPresence(innerpresence.Presence{"key": c1.Key()})) + assert.NoError(t, err) + assert.NotNil(t, rch1) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c2.Attach(ctx, d2, client.WithPresence(innerpresence.Presence{"key": c2.Key()}))) + rch2, err := c2.Attach(ctx, d2, client.WithPresence(innerpresence.Presence{"key": c2.Key()})) + assert.NoError(t, err) + assert.NotNil(t, rch2) defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() // 02. Check that the presence is updated on the other client. @@ -124,25 +142,23 @@ func TestPresence(t *testing.T) { }) t.Run("presence-related events test", func(t *testing.T) { - // 01. Create two clients and documents and attach them. - ctx := context.Background() - d1 := document.New(helper.TestDocKey(t)) - d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) - defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() - - // 02. Watch the first client's document. + // 01. Create two clients and documents. var expected []watchResponsePair var responsePairs []watchResponsePair wgEvents := sync.WaitGroup{} wgEvents.Add(1) + ctx := context.Background() + d1 := document.New(helper.TestDocKey(t)) + d2 := document.New(helper.TestDocKey(t)) + + // 02. Watch the first client's document. watch1Ctx, cancel1 := context.WithCancel(ctx) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) + rch1, err := c1.Attach(watch1Ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() go func() { defer func() { wgEvents.Done() @@ -152,7 +168,7 @@ func TestPresence(t *testing.T) { case <-time.After(time.Second): assert.Fail(t, "timeout") return - case wr := <-wrch: + case wr := <-rch1: if wr.Err != nil { assert.Fail(t, "unexpected stream closing", wr.Err) return @@ -163,7 +179,7 @@ func TestPresence(t *testing.T) { Presences: wr.Presences, }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -171,15 +187,11 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) - _, err = c2.Watch(watch2Ctx, d2) + rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) + defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -197,12 +209,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() wgEvents.Wait() @@ -210,24 +216,23 @@ func TestPresence(t *testing.T) { }) t.Run("unwatch after detach events test", func(t *testing.T) { - // 01. Create two clients and documents and attach them. + // 01. Create two clients and documents. ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) - // 02. Watch the first client's document. + // 02. Attach the first client's document. var expected []watchResponsePair var responsePairs []watchResponsePair wgEvents := sync.WaitGroup{} wgEvents.Add(1) watch1Ctx, cancel1 := context.WithCancel(ctx) - defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) + rch1, err := c1.Attach(watch1Ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() + defer cancel1() go func() { defer func() { wgEvents.Done() @@ -237,7 +242,7 @@ func TestPresence(t *testing.T) { case <-time.After(10 * time.Second): assert.Fail(t, "timeout") return - case wr := <-wrch: + case wr := <-rch1: if wr.Err != nil { assert.Fail(t, "unexpected stream closing", wr.Err) return @@ -249,7 +254,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -257,16 +262,11 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() - _, err = c2.Watch(watch2Ctx, d2) + rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -284,12 +284,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) assert.NoError(t, c2.Detach(ctx, d2)) assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) wgEvents.Wait() @@ -303,9 +297,7 @@ func TestPresence(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, c1.Attach(ctx, d1)) defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - assert.NoError(t, c2.Attach(ctx, d2)) // 02. Watch the first client's document. var expected []watchResponsePair @@ -315,8 +307,9 @@ func TestPresence(t *testing.T) { watch1Ctx, cancel1 := context.WithCancel(ctx) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) + rch1, err := c1.Attach(watch1Ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) go func() { defer func() { wgEvents.Done() @@ -326,7 +319,7 @@ func TestPresence(t *testing.T) { case <-time.After(10 * time.Second): assert.Fail(t, "timeout") return - case wr := <-wrch: + case wr := <-rch1: if wr.Err != nil { assert.Fail(t, "unexpected stream closing", wr.Err) return @@ -338,7 +331,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 3 { + if len(responsePairs) == 1 { return } } @@ -346,16 +339,11 @@ func TestPresence(t *testing.T) { }() // 03. Watch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, - }, - }) watch2Ctx, cancel2 := context.WithCancel(ctx) defer cancel2() - _, err = c2.Watch(watch2Ctx, d2) + rch2, err := c2.Attach(watch2Ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) // 04. Update the second client's presence. err = d2.Update(func(root *json.Object, p *presence.Presence) error { @@ -373,12 +361,6 @@ func TestPresence(t *testing.T) { assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) // 05. Unwatch the second client's document. - expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, - }) cancel2() assert.NoError(t, c2.Detach(ctx, d2)) @@ -396,10 +378,8 @@ func TestPresence(t *testing.T) { d1 := document.New(helper.TestDocKey(t)) d2 := document.New(helper.TestDocKey(t)) d3 := document.New(helper.TestDocKey(t) + "2") - assert.NoError(t, c1.Attach(ctx, d1)) - defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() - // 02. Watch the first client's document. + // 02. Attach the first client's document. var expected []watchResponsePair var responsePairs []watchResponsePair wgEvents := sync.WaitGroup{} @@ -407,8 +387,10 @@ func TestPresence(t *testing.T) { watch1Ctx, cancel1 := context.WithCancel(ctx) defer cancel1() - wrch, err := c1.Watch(watch1Ctx, d1) + rch1, err := c1.Attach(watch1Ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) + defer func() { assert.NoError(t, c1.Detach(ctx, d1)) }() go func() { defer func() { wgEvents.Done() @@ -418,7 +400,7 @@ func TestPresence(t *testing.T) { case <-time.After(time.Second): assert.Fail(t, "timeout") return - case wr := <-wrch: + case wr := <-rch1: if wr.Err != nil { assert.Fail(t, "unexpected stream closing", wr.Err) return @@ -431,7 +413,7 @@ func TestPresence(t *testing.T) { }) } - if len(responsePairs) == 2 { + if len(responsePairs) == 1 { return } } @@ -440,34 +422,45 @@ func TestPresence(t *testing.T) { // 03. The second client attaches a document with the same key as the first client's document // and another document with a different key. - assert.NoError(t, c2.Attach(ctx, d2)) + watch2Ctx, cancel2 := context.WithCancel(ctx) + rch2, err := c2.Attach(watch2Ctx, d2) + assert.NoError(t, err) + assert.NotNil(t, rch2) + assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) defer func() { assert.NoError(t, c2.Detach(ctx, d2)) }() - assert.NoError(t, c2.Attach(ctx, d3)) + + watch3Ctx, cancel3 := context.WithCancel(ctx) + rch3, err := c2.Attach(watch3Ctx, d3) + assert.NoError(t, err) + assert.NotNil(t, rch3) + assert.NoError(t, err) defer func() { assert.NoError(t, c2.Detach(ctx, d3)) }() - // 04. The second client watches the documents attached by itself. - expected = append(expected, watchResponsePair{ - Type: client.DocumentWatched, - Presences: map[string]innerpresence.Presence{ - c2.ID().String(): d2.MyPresence(), - }, + // 04. Update clients presence. + err = d3.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil }) - watch2Ctx, cancel2 := context.WithCancel(ctx) - _, err = c2.Watch(watch2Ctx, d2) assert.NoError(t, err) - assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) - watch3Ctx, cancel3 := context.WithCancel(ctx) - _, err = c2.Watch(watch3Ctx, d3) + err = d2.Update(func(root *json.Object, p *presence.Presence) error { + p.Set("updated", "true") + return nil + }) assert.NoError(t, err) - // 05. The second client unwatch the documents attached by itself. + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)+"2"))) + assert.NoError(t, c2.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + assert.NoError(t, c1.Sync(ctx, client.WithDocKey(helper.TestDocKey(t)))) + expected = append(expected, watchResponsePair{ - Type: client.DocumentUnwatched, + Type: client.PresenceChanged, Presences: map[string]innerpresence.Presence{ - c2.ID().String(): {}, + c2.ID().String(): d2.MyPresence(), }, }) + + // 04. The second client unwatch the documents attached by itself. cancel2() cancel3() diff --git a/test/integration/primitive_test.go b/test/integration/primitive_test.go index 747eb598a..4481cb3d6 100644 --- a/test/integration/primitive_test.go +++ b/test/integration/primitive_test.go @@ -39,8 +39,9 @@ func TestPrimitive(t *testing.T) { t.Run("causal primitive data test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewObject("k1"). @@ -66,8 +67,9 @@ func TestPrimitive(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) syncClientsThenAssertEqual(t, []clientAndDocPair{{c1, d1}, {c2, d2}}) }) diff --git a/test/integration/retention_test.go b/test/integration/retention_test.go index 5864a9d3c..9422f9d08 100644 --- a/test/integration/retention_test.go +++ b/test/integration/retention_test.go @@ -81,7 +81,9 @@ func TestRetention(t *testing.T) { ctx := context.Background() doc := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli.Attach(ctx, doc)) + rch, err := cli.Attach(ctx, doc) + assert.NoError(t, err) + assert.NotNil(t, rch) defer func() { assert.NoError(t, cli.Detach(ctx, doc)) }() assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error { @@ -141,7 +143,9 @@ func TestRetention(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli1.Attach(ctx, d1)) + rch, err := cli1.Attach(ctx, d1) + assert.NoError(t, err) + assert.NotNil(t, rch) defer func() { assert.NoError(t, cli1.Detach(ctx, d1)) }() err = d1.Update(func(root *json.Object, p *presence.Presence) error { @@ -214,7 +218,10 @@ func TestRetention(t *testing.T) { }() d2 := document.New(helper.TestDocKey(t)) - assert.NoError(t, cli2.Attach(ctx, d2)) + rch2, err := cli2.Attach(ctx, d2) + assert.NoError(t, err) + assert.NotNil(t, rch2) + defer func() { assert.NoError(t, cli2.Detach(ctx, d2)) }() // Create 6 changes diff --git a/test/integration/snapshot_test.go b/test/integration/snapshot_test.go index 7e3cc3a73..20d917b49 100644 --- a/test/integration/snapshot_test.go +++ b/test/integration/snapshot_test.go @@ -57,12 +57,14 @@ func TestSnapshot(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) // 01. Update changes over snapshot threshold. for i := 0; i <= int(helper.SnapshotThreshold); i++ { @@ -93,8 +95,9 @@ func TestSnapshot(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1") @@ -127,8 +130,9 @@ func TestSnapshot(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) assert.Equal(t, `{"k1":[{"val":"하"},{"val":"늘"},{"val":"구"},{"val":"름"}]}`, d1.Marshal()) assert.Equal(t, d1.Marshal(), d2.Marshal()) @@ -138,8 +142,9 @@ func TestSnapshot(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1") @@ -150,8 +155,9 @@ func TestSnapshot(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) for i := 0; i <= int(helper.SnapshotThreshold); i++ { err = d1.Update(func(root *json.Object, p *presence.Presence) error { diff --git a/test/integration/text_test.go b/test/integration/text_test.go index b85be27d4..7c6950ed3 100644 --- a/test/integration/text_test.go +++ b/test/integration/text_test.go @@ -38,8 +38,9 @@ func TestText(t *testing.T) { t.Run("text test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1") @@ -50,8 +51,9 @@ func TestText(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetText("k1").Edit(0, 0, "ABCD") @@ -136,8 +138,9 @@ func TestText(t *testing.T) { t.Run("rich text test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1").Edit(0, 0, "Hello world", nil) @@ -148,8 +151,9 @@ func TestText(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { text := root.GetText("k1") @@ -171,8 +175,9 @@ func TestText(t *testing.T) { t.Run("concurrent block deletions test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1") @@ -183,8 +188,9 @@ func TestText(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetText("k1").Edit(0, 0, "123") @@ -217,8 +223,9 @@ func TestText(t *testing.T) { t.Run("new creation then concurrent deletion test", func(t *testing.T) { ctx := context.Background() d1 := document.New(helper.TestDocKey(t)) - err := c1.Attach(ctx, d1) + rch1, err := c1.Attach(ctx, d1) assert.NoError(t, err) + assert.NotNil(t, rch1) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.SetNewText("k1") @@ -229,8 +236,9 @@ func TestText(t *testing.T) { assert.NoError(t, err) d2 := document.New(helper.TestDocKey(t)) - err = c2.Attach(ctx, d2) + rch2, err := c2.Attach(ctx, d2) assert.NoError(t, err) + assert.NotNil(t, rch2) err = d1.Update(func(root *json.Object, p *presence.Presence) error { root.GetText("k1").Edit(0, 0, "0")