Skip to content

Commit

Permalink
Merge pull request #104 from philippseith/bugfix/Return_CloseMessage_…
Browse files Browse the repository at this point in the history
…Error

Bugfix/return close message error
  • Loading branch information
philippseith authored Nov 18, 2021
2 parents e50db63 + 6f42350 commit d8ee60f
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 239 deletions.
58 changes: 29 additions & 29 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,35 @@ jobs:
token: ${{secrets.CODECOV_TOKEN}}
file: ./coverage.txt

test-macos:
name: Test MacOS
runs-on: macos-11
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16

- name: Check out code
uses: actions/checkout@v1

- name: Run Unit tests.
run: make test

test-windows:
name: Test Windows
runs-on: windows-2019
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16

- name: Check out code
uses: actions/checkout@v1

- name: Run Unit tests.
run: make test
# test-macos:
# name: Test MacOS
# runs-on: macos-11
# steps:
# - name: Set up Go
# uses: actions/setup-go@v1
# with:
# go-version: 1.16
#
# - name: Check out code
# uses: actions/checkout@v1
#
# - name: Run Unit tests.
# run: make test
#
# test-windows:
# name: Test Windows
# runs-on: windows-2019
# steps:
# - name: Set up Go
# uses: actions/setup-go@v1
# with:
# go-version: 1.16
#
# - name: Check out code
# uses: actions/checkout@v1
#
# - name: Run Unit tests.
# run: make test

build:
name: Build
Expand Down
7 changes: 4 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ func (c *client) WaitForState(ctx context.Context, waitFor ClientState) <-chan e
return ch
}
stateCh := make(chan struct{}, 1)
c.ObserveStateChanged(stateCh)
go func() {
cancel := c.ObserveStateChanged(stateCh)
go func(waitFor ClientState) {
defer close(ch)
defer cancel()
if c.waitingIsOver(waitFor, ch) {
return
}
Expand All @@ -325,7 +326,7 @@ func (c *client) WaitForState(ctx context.Context, waitFor ClientState) <-chan e
return
}
}
}()
}(waitFor)
return ch
}

Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *simpleHub) InvokeMe(arg1 string, arg2 int) string {
}

func (s *simpleHub) Callback(arg1 string) {
s.Hub.context.Clients().Caller().Send("OnCallback", strings.ToUpper(arg1))
s.Hub.Clients().Caller().Send("OnCallback", strings.ToUpper(arg1))
}

func (s *simpleHub) ReadStream(i int) chan string {
Expand Down
18 changes: 16 additions & 2 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,62 @@ type HubInterface interface {
// Hub is a base class for hubs
type Hub struct {
context HubContext
cm sync.Mutex
cm sync.RWMutex
}

// Initialize initializes a hub with a HubContext
func (h *Hub) Initialize(ctx HubContext) {
defer h.cm.Unlock()
h.cm.Lock()
defer h.cm.Unlock()
h.context = ctx
}

// Clients returns the clients of this hub
func (h *Hub) Clients() HubClients {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.Clients()
}

// Groups returns the client groups of this hub
func (h *Hub) Groups() GroupManager {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.Groups()
}

// Items returns the items for this connection
func (h *Hub) Items() *sync.Map {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.Items()
}

// ConnectionID gets the ID of the current connection
func (h *Hub) ConnectionID() string {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.ConnectionID()
}

// Context is the context.Context of the current connection
func (h *Hub) Context() context.Context {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.Context()
}

// Abort aborts the current connection
func (h *Hub) Abort() {
h.cm.RLock()
defer h.cm.RUnlock()
h.context.Abort()
}

// Logger returns the loggers used in this server. By this, derived hubs can use the same loggers as the server.
func (h *Hub) Logger() (info StructuredLogger, dbg StructuredLogger) {
h.cm.RLock()
defer h.cm.RUnlock()
return h.context.Logger()
}

Expand Down
1 change: 1 addition & 0 deletions hubcontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func makePipeClientsAndReceivers() ([]Client, []*SimpleReceiver, context.CancelF
receiver[i] = &SimpleReceiver{ch: make(chan struct{})}
client[i], _ = NewClient(ctx, WithConnection(cliConn[i]), WithReceiver(receiver[i]), testLoggerOption())
client[i].Start()
<-client[i].WaitForState(ctx, ClientConnected)
}
return client, receiver, cancel
}
Expand Down
4 changes: 4 additions & 0 deletions loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package signalr

import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/debug"
Expand Down Expand Up @@ -89,6 +90,9 @@ msgLoop:
case closeMessage:
_ = l.dbg.Log(evt, msgRecv, msg, fmtMsg(message))
l.closeMessage = &message
if message.Error != "" {
err = errors.New(message.Error)
}
case hubMessage:
// Mostly ping
err = l.handleOtherMessage(message)
Expand Down
4 changes: 2 additions & 2 deletions streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *streamClient) buildChannelArgument(invocation invocationMessage, argTyp
if argType.Kind() != reflect.Chan || argType.ChanDir() == reflect.SendDir {
return reflect.Value{}, false, nil
} else if len(invocation.StreamIds) > chanCount {
// MakeChan does only accept bidirectional channels and we need to Send to this channel anyway
// MakeChan does only accept bidirectional channels, and we need to Send to this channel anyway
arg = reflect.MakeChan(reflect.ChanOf(reflect.BothDir, argType.Elem()), int(c.streamBufferCapacity))
c.upstreamChannels[invocation.StreamIds[chanCount]] = arg
return arg, true, nil
Expand Down Expand Up @@ -64,7 +64,7 @@ func (c *streamClient) receiveStreamItem(streamItem streamItemMessage) error {
c.mx.Lock()
defer c.mx.Unlock()
if upChan, ok := c.upstreamChannels[streamItem.InvocationID]; ok {
// Mark stream as running to detect illegal completion with result on this id
// Mark the stream as running to detect illegal completion with result on this id
c.runningStreams[streamItem.InvocationID] = true
chanVal := reflect.New(upChan.Type().Elem())
err := c.protocol.UnmarshalArgument(streamItem.Item, chanVal.Interface())
Expand Down
Loading

0 comments on commit d8ee60f

Please sign in to comment.