Skip to content

Commit

Permalink
chg: stabilize tests
Browse files Browse the repository at this point in the history
  • Loading branch information
philippseith committed Nov 17, 2021
1 parent 1e194ad commit 8dfba5d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 222 deletions.
267 changes: 83 additions & 184 deletions hubcontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -65,71 +64,6 @@ func (c *contextHub) Abort() {
c.Hub.Abort()
}

func connectMany() (Server, []*testingConnection, []string) {
s, err := NewServer(context.TODO(), SimpleHubFactory(&contextHub{}),
testLoggerOption())
if err != nil {
Fail(err.Error())
return nil, nil, nil
}
conns := make([]*testingConnection, 3)
connIds := make([]string, 0)
for i := 0; i < 3; i++ {
conns[i] = newTestingConnectionForServer()
}
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func(i int) {
wg.Done()
_ = s.Serve(conns[i])
}(i)
}
wg.Wait()
return s, conns, connIds
}

func expectInvocationMessageFromConnection(ctx context.Context, conn *testingConnection, i int, errCh chan error) {
var msg interface{}
select {
case msg = <-conn.received:
if _, ok := msg.(completionMessage); ok {
fmt.Printf("Skipped completion %#v\n", msg)
select {
case msg = <-conn.received:
case <-time.After(time.Second):
errCh <- fmt.Errorf("timeout client %v waiting for message", i)
return
case <-ctx.Done():
return
}
}
case <-time.After(time.Second):
errCh <- fmt.Errorf("timeout client %v waiting for message", i)
return
case <-ctx.Done():
return
}
if ivMsg, ok := msg.(invocationMessage); !ok {
errCh <- fmt.Errorf("client %v expected invocationMessage, got %T %#v", i, msg, msg)
return
} else {
if strings.ToLower(ivMsg.Target) != "clientfunc" {
errCh <- fmt.Errorf("client %v expected clientfunc, got got %#v", i, ivMsg)
return
}
}
errCh <- nil
}

func expectNoMessageFromConnection(ctx context.Context, conn *testingConnection, errCh chan error) {
select {
case msg := <-conn.received:
errCh <- fmt.Errorf("received unexpected message %T %#v", msg, msg)
case <-ctx.Done():
}
}

type SimpleReceiver struct {
ch chan struct{}
}
Expand All @@ -138,7 +72,7 @@ func (sr *SimpleReceiver) ClientFunc() {
close(sr.ch)
}

func makeServerAndClients(ctx context.Context, clientCount int) (Server, []Client, []*SimpleReceiver, []Connection, []Connection, error) {
func makeTCPServerAndClients(ctx context.Context, clientCount int) (Server, []Client, []*SimpleReceiver, []Connection, []Connection, error) {
server, err := NewServer(ctx, SimpleHubFactory(&contextHub{}), testLoggerOption())
if err != nil {
return nil, nil, nil, nil, nil, err
Expand Down Expand Up @@ -187,134 +121,99 @@ func makeServerAndClients(ctx context.Context, clientCount int) (Server, []Clien
return server, client, receiver, srvConn, cliConn, nil
}

func makePipeClientsAndReceivers() ([]Client, []*SimpleReceiver, context.CancelFunc) {
cliConn := make([]*pipeConnection, 3)
srvConn := make([]*pipeConnection, 3)
for i := 0; i < 3; i++ {
cliConn[i], srvConn[i] = newClientServerConnections()
cliConn[i].SetConnectionID(fmt.Sprint(i))
srvConn[i].SetConnectionID(fmt.Sprint(i))
}
ctx, cancel := context.WithCancel(context.Background())
server, _ := NewServer(ctx, SimpleHubFactory(&contextHub{}), testLoggerOption())
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func(i int) {
wg.Done()
_ = server.Serve(srvConn[i])
}(i)
}
wg.Wait()
client := make([]Client, 3)
receiver := make([]*SimpleReceiver, 3)
for i := 0; i < 3; i++ {
receiver[i] = &SimpleReceiver{ch: make(chan struct{})}
client[i], _ = NewClient(ctx, WithConnection(cliConn[i]), WithReceiver(receiver[i]), testLoggerOption())
client[i].Start()
}
return client, receiver, cancel
}

var _ = Describe("HubContext", func() {
Context("Clients().All()", func() {
It("should invoke all clients", func() {
server, conns, _ := connectMany()
defer server.cancel()
conns[0].ClientSend(`{"type":1,"invocationId": "123","target":"callall"}`)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh1 := make(chan error, 1)
go expectInvocationMessageFromConnection(ctx, conns[0], 1, errCh1)
errCh2 := make(chan error, 1)
go expectInvocationMessageFromConnection(ctx, conns[1], 2, errCh2)
errCh3 := make(chan error, 1)
go expectInvocationMessageFromConnection(ctx, conns[2], 3, errCh3)
done := make(chan error, 1)
go func(ctx context.Context, done, errCh1, errCh2, errCh3 chan error) {
results := 0
for results < 3 {
for i := 0; i < 10; i++ {
Context("Clients().All()", func() {
It("should invoke all clients", func() {
client, receiver, cancel := makePipeClientsAndReceivers()
r := <-client[0].Invoke("CallAll")
Expect(r.Error).NotTo(HaveOccurred())
result := 0
for result < 3 {
select {
case err := <-errCh1:
if err != nil {
done <- err
}
results++
case err := <-errCh2:
if err != nil {
done <- err
}
results++
case err := <-errCh3:
if err != nil {
done <- err
}
results++
case <-ctx.Done():
done <- ctx.Err()
return
case <-receiver[0].ch:
result++
case <-receiver[1].ch:
result++
case <-receiver[2].ch:
result++
case <-time.After(2 * time.Second):
Fail("timeout waiting for clients getting results")
}
}
done <- nil
}(ctx, done, errCh1, errCh2, errCh3)
select {
case err := <-done:
Expect(err).NotTo(HaveOccurred())
case <-time.After(2 * time.Second):
Fail("timeout waiting for clients getting results")
}
cancel()
})
})
})
Context("Clients().Caller()", func() {
It("should invoke only the caller", func() {
server, conns, _ := connectMany()
defer server.cancel()
conns[0].ClientSend(`{"type":1,"invocationId": "123","target":"callcaller"}`)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh1 := make(chan error, 1)
go expectInvocationMessageFromConnection(ctx, conns[0], 1, errCh1)
errCh2 := make(chan error, 1)
go expectNoMessageFromConnection(ctx, conns[1], errCh2)
errCh3 := make(chan error, 1)
go expectNoMessageFromConnection(ctx, conns[2], errCh3)
done := make(chan error, 1)
go func(ctx context.Context, done, errCh1, errCh2, errCh3 chan error) {
for {
select {
case err := <-errCh1:
if err != nil {
done <- err
return
}
case err := <-errCh2:
done <- err
return
case err := <-errCh3:
done <- err
return
case <-time.After(100 * time.Millisecond):
done <- nil
return
case <-ctx.Done():
done <- ctx.Err()
return
}
Context("Clients().Caller()", func() {
It("should invoke only the caller", func() {
client, receiver, cancel := makePipeClientsAndReceivers()
r := <-client[0].Invoke("CallCaller")
Expect(r.Error).NotTo(HaveOccurred())
select {
case <-receiver[0].ch:
case <-receiver[1].ch:
Fail("Wrong client received message")
case <-receiver[2].ch:
Fail("Wrong client received message")
case <-time.After(2 * time.Second):
Fail("timeout waiting for clients getting results")
}
}(ctx, done, errCh1, errCh2, errCh3)
select {
case err := <-done:
Expect(err).NotTo(HaveOccurred())
case <-time.After(1 * time.Second):
Fail("timeout waiting for client getting result")
}
cancel()
})
})
})
Context("Clients().Client()", func() {
It("should invoke only the client which was addressed", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client, receiver, srvConn, _, err := makeServerAndClients(ctx, 3)
Expect(err).NotTo(HaveOccurred())

select {
case ir := <-client[0].Invoke("CallClient", srvConn[2].ConnectionID()):
Expect(ir.Error).NotTo(HaveOccurred())
case <-time.After(100 * time.Millisecond):
Fail("timeout in invoke")
}
gotCalled := false
select {
case <-receiver[0].ch:
Fail("client 1 received message for client 3")
case <-receiver[1].ch:
Fail("client 2 received message for client 3")
case <-receiver[2].ch:
gotCalled = true
case <-time.After(100 * time.Millisecond):
if !gotCalled {
Fail("timeout without client 3 got called")
Context("Clients().Client()", func() {
It("should invoke only the client which was addressed", func() {
client, receiver, cancel := makePipeClientsAndReceivers()
r := <-client[0].Invoke("CallClient", "1")
Expect(r.Error).NotTo(HaveOccurred())
select {
case <-receiver[0].ch:
Fail("Wrong client received message")
case <-receiver[1].ch:
case <-receiver[2].ch:
Fail("Wrong client received message")
case <-time.After(2 * time.Second):
Fail("timeout waiting for clients getting results")
}
}
cancel()
})
})
})
}
})

func TestGroupShouldInvokeOnlyTheClientsInTheGroup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client, receiver, srvConn, _, err := makeServerAndClients(ctx, 3)
_, client, receiver, srvConn, _, err := makeTCPServerAndClients(ctx, 3)
assert.NoError(t, err)
select {
case ir := <-client[0].Invoke("buildgroup", srvConn[1].ConnectionID(), srvConn[2].ConnectionID()):
Expand Down Expand Up @@ -346,7 +245,7 @@ func TestGroupShouldInvokeOnlyTheClientsInTheGroup(t *testing.T) {
func TestRemoveClientsShouldRemoveClientsFromTheGroup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client, receiver, srvConn, _, err := makeServerAndClients(ctx, 3)
_, client, receiver, srvConn, _, err := makeTCPServerAndClients(ctx, 3)
assert.NoError(t, err)
select {
case ir := <-client[0].Invoke("buildgroup", srvConn[1].ConnectionID(), srvConn[2].ConnectionID()):
Expand Down Expand Up @@ -384,7 +283,7 @@ func TestRemoveClientsShouldRemoveClientsFromTheGroup(t *testing.T) {
func TestItemsShouldHoldItemsConnectionWise(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client, _, _, _, err := makeServerAndClients(ctx, 2)
_, client, _, _, _, err := makeTCPServerAndClients(ctx, 2)
assert.NoError(t, err)
select {
case ir := <-client[0].Invoke("additem", "first", 1):
Expand All @@ -411,7 +310,7 @@ func TestItemsShouldHoldItemsConnectionWise(t *testing.T) {
func TestAbortShouldAbortTheConnectionOfTheCurrentCaller(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client, _, _, _, err := makeServerAndClients(ctx, 2)
_, client, _, _, _, err := makeTCPServerAndClients(ctx, 2)
assert.NoError(t, err)
select {
case ir := <-client[0].Invoke("abort"):
Expand Down
Loading

0 comments on commit 8dfba5d

Please sign in to comment.