Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
karockai committed Sep 9, 2023
1 parent 93ee546 commit 49ce076
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 73 deletions.
193 changes: 134 additions & 59 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/rs/xid"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
Expand All @@ -40,6 +32,13 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"strings"
gotime "time"
)

type status int
Expand Down Expand Up @@ -69,8 +68,9 @@ var (

// Attachment represents the document attached.
type Attachment struct {
doc *document.Document
docID types.ID
doc *document.Document
docID types.ID
watchStream api.YorkieService_WatchDocumentClient
}

// Client is a normal client that can communicate with the server.
Expand Down Expand Up @@ -238,6 +238,12 @@ func (c *Client) Deactivate(ctx context.Context) error {
return err
}

for _, val := range c.attachments {
if err := val.cancelWatchStream(); err != nil {
return err
}
}

c.status = deactivated

return nil
Expand Down Expand Up @@ -376,9 +382,14 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
if doc.Status() != document.StatusRemoved {
doc.SetStatus(document.StatusDetached)
}
delete(c.attachments, doc.Key())

return nil
//if err := attachment.cancelWatchStream(); err != nil {
// return err
//}

//delete(c.attachments, doc.Key())

return c.detachInternal(doc.Key())
}

// Sync pushes local changes of the attached documents to the server and
Expand Down Expand Up @@ -414,18 +425,6 @@ func (c *Client) watch(
return nil, ErrDocumentNotAttached
}

rch := make(chan WatchResponse)
stream, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
},
)
if err != nil {
return nil, err
}

handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
Expand Down Expand Up @@ -484,33 +483,78 @@ func (c *Client) watch(
return nil, ErrUnsupportedWatchResponseType
}

pbResp, err := stream.Recv()
if err != nil {
return nil, err
}
if _, err := handleResponse(pbResp); err != nil {
return nil, err
rch := make(chan WatchResponse)

createStream := func() (api.YorkieService_WatchDocumentClient, error) {
fmt.Printf("before create stream\n")
result, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
},
)
fmt.Printf("create stream: %s, err: %s\n", result, err)
return result, err
}

// Detach되었을 때 이 채널을 종료해야한다.
go func() {
var err error
attachment.watchStream, err = createStream()
if err != nil {
return
}

for {
pbResp, err := stream.Recv()
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
return
}
resp, err := handleResponse(pbResp)
if err != nil {
rch <- WatchResponse{Err: err}
select {
case <-ctx.Done():
close(rch)
return
}
if resp == nil {
case e := <-doc.Events():
if e.Type == document.WatchedEvent {
//t = DocumentWatched
continue
} else if e.Type == document.UnwatchedEvent {
//_ = c.detachInternal(doc.Key())
_ = attachment.cancelWatchStream()
close(rch)
return
}
t := PresenceChanged
rch <- WatchResponse{Type: t, Presences: e.Presences}
continue
}
default:
if attachment.watchStream == nil {
gotime.Sleep(1 * gotime.Second)
attachment.watchStream, err = createStream()
if err != nil {
return
}
continue
}

pbResp, err := attachment.watchStream.Recv()
// stream 수신 중에 에러가 발생하면 stream을 초기화하고 다시 수신을 시작한다.
// stream만 초기화한다.
if err != nil {
//rch <- WatchResponse{Err: err}
attachment.watchStream = nil
continue
}
resp, err := handleResponse(pbResp)
if err != nil {
rch <- WatchResponse{Err: err}
_ = attachment.cancelWatchStream()
close(rch)
return
}
if resp == nil {
continue
}

rch <- *resp
rch <- *resp
}
}
}()

Expand All @@ -523,22 +567,22 @@ func (c *Client) watch(

// TODO(hackerwins): We should ensure that the goroutine is closed when
// the stream is closed.
go func() {
for {
select {
case e := <-doc.Events():
t := PresenceChanged
if e.Type == document.WatchedEvent {
t = DocumentWatched
} else if e.Type == document.UnwatchedEvent {
t = DocumentUnwatched
}
rch <- WatchResponse{Type: t, Presences: e.Presences}
case <-ctx.Done():
return
}
}
}()
//go func() {
// for {
// select {
// case e := <-doc.Events():
// t := PresenceChanged
// if e.Type == document.WatchedEvent {
// t = DocumentWatched
// } else if e.Type == document.UnwatchedEvent {
// t = DocumentUnwatched
// }
// rch <- WatchResponse{Type: t, Presences: e.Presences}
// case <-ctx.Done():
// return
// }
// }
//}()

return rch, nil
}
Expand Down Expand Up @@ -606,6 +650,7 @@ func (c *Client) pushPullChanges(ctx context.Context, opt SyncOptions) error {
return err
}
if attachment.doc.Status() == document.StatusRemoved {
_ = attachment.cancelWatchStream()
delete(c.attachments, attachment.doc.Key())
}

Expand Down Expand Up @@ -652,6 +697,36 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
if doc.Status() == document.StatusRemoved {
delete(c.attachments, doc.Key())
}
_ = attachment.cancelWatchStream()

return nil
}

func (c *Client) detachInternal(docKey key.Key) error {
attachment, ok := c.attachments[docKey]
if !ok {
//return ErrDocumentNotAttached
return nil
}

if err := attachment.cancelWatchStream(); err != nil {
return err
}

attachment.doc.Unwatch()
delete(c.attachments, docKey)

return nil
}

func (a *Attachment) cancelWatchStream() error {
if a.watchStream != nil {
if err := a.watchStream.CloseSend(); err != nil {
a.watchStream = nil
return err
}
}
a.watchStream = nil

return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ func (d *Document) Events() <-chan DocEvent {
return d.events
}

func (d *Document) Unwatch() {
d.events <- DocEvent{
Type: UnwatchedEvent,
}
}

func messageFromMsgAndArgs(msgAndArgs ...interface{}) string {
if len(msgAndArgs) == 0 {
return ""
Expand Down
4 changes: 2 additions & 2 deletions test/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ var testStartedAt int64
var (
RPCPort = 21101
RPCMaxRequestBytes = uint64(4 * 1024 * 1024)
RPCMaxConnectionAge = 8 * gotime.Second
RPCMaxConnectionAgeGrace = 2 * gotime.Second
RPCMaxConnectionAge = 0 * gotime.Second
RPCMaxConnectionAgeGrace = 0 * gotime.Second

ProfilingPort = 21102

Expand Down
Loading

0 comments on commit 49ce076

Please sign in to comment.