Skip to content

Commit

Permalink
fix StreamEvents and StreamEventsStore reconnection issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Lior Nabat committed Feb 13, 2023
1 parent f202128 commit 92d5d0e
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 480 deletions.
2 changes: 1 addition & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: '3'

vars:
BINARY_NAME: kubemq-go
VERSION: v1.7.7
VERSION: v1.7.8

tasks:
check_update:
Expand Down
20 changes: 11 additions & 9 deletions events_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient
}

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error) {
if err:=es.isClientReady();err!=nil{
return nil,err
if err := es.isClientReady(); err != nil {
return nil, err
}
message.transport = es.client.transport
return es.client.SetEventStore(message).Send(ctx)
}

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error) {
if err:=es.isClientReady();err!=nil{
return nil,err
if err := es.isClientReady(); err != nil {
return nil, err
}
if onResult == nil {
return nil, fmt.Errorf("events stream result callback function is required")
Expand All @@ -75,7 +75,10 @@ func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *E
go func() {
for {
select {
case result := <-eventsResultCh:
case result, ok := <-eventsResultCh:
if !ok {
return
}
onResult(result, nil)
case err := <-errCh:
onResult(nil, err)
Expand All @@ -88,7 +91,7 @@ func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *E
}

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error {
if err:=es.isClientReady();err!=nil{
if err := es.isClientReady(); err != nil {
return err
}
if onEvent == nil {
Expand Down Expand Up @@ -118,16 +121,15 @@ func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStore
}

func (es *EventsStoreClient) Close() error {
if err:=es.isClientReady();err!=nil{
if err := es.isClientReady(); err != nil {
return err
}
return es.client.Close()
}

func (es *EventsStoreClient) isClientReady() error {
if es.client==nil {
if es.client == nil {
return fmt.Errorf("client is not initialized")
}
return nil
}

28 changes: 27 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/kubemq-io/kubemq-go

go 1.18
go 1.19

require (
github.com/go-resty/resty/v2 v2.7.0
Expand All @@ -14,3 +14,29 @@ require (
go.uber.org/atomic v1.10.0
google.golang.org/grpc v1.51.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/sys v0.0.0-20220908164124-27713097b956 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 92d5d0e

Please sign in to comment.