Skip to content

Commit

Permalink
feat: add unsubscribe (#15)
Browse files Browse the repository at this point in the history
* feat: add unsubscribe

* fix: fix unit_test

* chore: optimize license header

* chore: optimize license header
  • Loading branch information
ViolaPioggia authored Jan 19, 2024
1 parent 688b860 commit 800cc5a
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func main() {
}
}()

select {}
wg.Wait()
}

```
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Client) startReadLoop(ctx context.Context, reader *EventStreamReader) (
func (c *Client) readLoop(ctx context.Context, reader *EventStreamReader, outCh chan *Event, erChan chan error) {
for {
// Read each new line and process the type of event
event, err := reader.ReadEvent()
event, err := reader.ReadEvent(ctx)
if err != nil {
if err == io.EOF {
erChan <- nil
Expand Down
27 changes: 26 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,6 +197,31 @@ func TestClientSubscribe(t *testing.T) {
assert.Nil(t, cErr)
}

func TestClientUnSubscribe(t *testing.T) {
go newServer(false, "8887")
time.Sleep(time.Second)
c := NewClient("http://127.0.0.1:8887/sse")

events := make(chan *Event)
ctx, cancel := context.WithCancel(context.Background())
var cErr error
go func() {
cErr = c.SubscribeWithContext(ctx, func(msg *Event) {
if msg.Data != nil {
events <- msg
return
}
})
assert.Nil(t, cErr)
}()
cancel()
time.Sleep(5 * time.Second)
for i := 0; i < 5; i++ {
_, err := wait(events, time.Second*1)
assert.DeepEqual(t, errors.New("timeout"), err)
}
}

func TestClientSubscribeMultiline(t *testing.T) {
go newMultilineServer("9007")
time.Sleep(time.Second)
Expand Down
2 changes: 1 addition & 1 deletion encoder.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion encoder_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
14 changes: 10 additions & 4 deletions event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@ package sse
import (
"bufio"
"bytes"
"context"
"io"
)

Expand Down Expand Up @@ -119,10 +120,15 @@ func minPosInt(a, b int) int {
}

// ReadEvent scans the EventStream for events.
func (e *EventStreamReader) ReadEvent() ([]byte, error) {
func (e *EventStreamReader) ReadEvent(ctx context.Context) ([]byte, error) {
if e.scanner.Scan() {
event := e.scanner.Bytes()
return event, nil
select {
case <-ctx.Done():
return nil, io.EOF
default:
event := e.scanner.Bytes()
return event, nil
}
}
if err := e.scanner.Err(); err != nil {
return nil, err
Expand Down
18 changes: 12 additions & 6 deletions examples/client/quickstart/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,11 +38,11 @@ package main

import (
"context"
"sync"

"github.com/hertz-contrib/sse"

"fmt"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/hertz-contrib/sse"
"sync"
"time"
)

var wg sync.WaitGroup
Expand All @@ -64,15 +64,21 @@ func main() {

events := make(chan *sse.Event)
errChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go func() {
cErr := c.Subscribe(func(msg *sse.Event) {
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
}
})
errChan <- cErr
}()
go func() {
time.Sleep(5 * time.Second)
cancel()
fmt.Println("client1 subscribe cancel")
}()
for {
select {
case e := <-events:
Expand Down
2 changes: 1 addition & 1 deletion examples/server/chat/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion examples/server/quickstart/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion examples/server/stockprice/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion sse.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion sse_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 CloudWeGo Authors
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 800cc5a

Please sign in to comment.