Skip to content

Commit

Permalink
examples package
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 29, 2023
1 parent 853c54c commit effdc5b
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 27 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ fmt:
@go fmt ./...

test:
@go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources"`
@go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources|examples"`

test-examples:
@make TEST_PKG=./examples/... test-pkg

test-pkg:
@go test -v -race -timeout=${TEST_TIMEOUT} ${TEST_PKG}

test-cov:
@go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources"`
@go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources|examples"`

test-open-cov:
@make test-cov
Expand Down
2 changes: 1 addition & 1 deletion core/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (c *Controller) Start(ctx context.Context) {

func (c *Controller) Close() {
c.StopOnce(func() {
c.lggr.Debugf("closing controller with host %s", c.host.ID())
c.threadControl.Close()
// d.lggr.Debugf("closing controller with host %s", d.host.ID())
if c.dht != nil {
if err := c.dht.Close(); err != nil {
c.lggr.Errorf("failed to close DHT: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion core/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func SetupTestControllers(ctx context.Context, t *testing.T, n int, routingFn fu
waitControllersConnected(n)

return controllers, msgRouters, valRouters, func() {
boot.Close()
go boot.Close() // closing bootstrapper in the background
for _, c := range controllers {
c.Close()
}
Expand Down
11 changes: 8 additions & 3 deletions tests/don.go → examples/don_simple/don.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package tests
package donsimple

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -41,6 +42,9 @@ func (d *mockedDon) run(interval time.Duration, subscribedDONs ...string) {
}
d.threadControl.Go(func(c context.Context) {
if err := node.subscribe(c, subscribedDONs...); err != nil {
if strings.Contains(err.Error(), "already tring to join") {
return
}
panic(err)
}
})
Expand Down Expand Up @@ -93,9 +97,10 @@ func (d *mockedDon) nextReport() *MockedSignedReport {

func (d *mockedDon) broadcast(r *MockedSignedReport) {
for _, n := range d.nodes {
n.reports.addReport(d.id, *r)
node := n
node.reports.addReport(d.id, *r)
d.threadControl.Go(func(ctx context.Context) {
conn, err := n.connect(true)
conn, err := node.connect(false)
if err != nil {
return
}
Expand Down
46 changes: 30 additions & 16 deletions tests/dons_test.go → examples/don_simple/dons_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tests
package donsimple

import (
"context"
Expand All @@ -8,7 +8,6 @@ import (
"time"

grpcapi "github.com/amirylm/p2pmq/api/grpc"
"github.com/amirylm/p2pmq/commons/utils"
"github.com/amirylm/p2pmq/core"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -49,8 +48,11 @@ func TestCrossDONCommunication(t *testing.T) {
reportsInterval: time.Second * 1,
},
"mercury": {
nodes: 4,
dons: 1,
nodes: 4,
dons: 1,
subscribed: []string{
"test",
},
reportsInterval: time.Second * 2,
},
"transmit": {
Expand Down Expand Up @@ -96,9 +98,6 @@ func TestCrossDONCommunication(t *testing.T) {
grpcServers[i] = grpcapi.NewGrpcServer(control, msgR, valR)
}

threadC := utils.NewThreadControl()
defer threadC.Close()

addrs := make([]string, n)
nodes := make([]*nodeClient, n)
for i, s := range grpcServers {
Expand All @@ -107,14 +106,20 @@ func TestCrossDONCommunication(t *testing.T) {
port := randPort()
addrs[i] = fmt.Sprintf(":%d", port)
nodes[i] = newNodeClient(fmt.Sprintf(":%d", port))
threadC.Go(func(ctx context.Context) {
go func() {
err := grpcapi.ListenGrpc(srv, port)
if ctx.Err() == nil {
require.NoError(t, err)
}
})
}()
}
}
defer func() {
for _, s := range grpcServers {
s.Stop()
}
t.Log("grpc servers stopped")
}()

<-time.After(time.Second * 5) // TODO: avoid timeout

Expand Down Expand Up @@ -149,30 +154,39 @@ func TestCrossDONCommunication(t *testing.T) {
"test": int(testDuration) / int(donsCfg["test"].reportsInterval),
}

checkLoop:
for ctx.Err() == nil {
<-time.After(time.Second * 5)
<-time.After(testDuration / 4)
for did, exp := range expectedReports {
instances := dons[did]
for _, don := range instances {
reportsCount := don.reportsCount()
for reportsCount < exp && ctx.Err() == nil {
for reportsCount+1 < exp && ctx.Err() == nil {
time.Sleep(time.Second)
reportsCount = don.reportsCount()
}
if ctx.Err() == nil {
t.Logf("DON %s reports count: %d", did, expectedReports[did])
// we have enough reports for this don
expectedReports[did] = 0
}
}
if ctx.Err() == nil {
t.Logf("DON %s reports count: %d", did, expectedReports[did])
// we have enough reports for this don
expectedReports[did] = 0
}
for _, exp := range expectedReports {
if exp != 0 {
continue checkLoop
}
}
break
}

// <-time.After(testDuration + testDuration/4)
<-time.After(testDuration / 4)

for did, exp := range expectedReports {
require.Equal(t, 0, exp, "DON %s reports count", did)
}
t.Log("done")
cancel()
}

func getRandomNodes(n int, items []*nodeClient) []*nodeClient {
Expand Down
4 changes: 2 additions & 2 deletions tests/node.go → examples/don_simple/node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tests
package donsimple

import (
"context"
Expand Down Expand Up @@ -38,7 +38,7 @@ func newNodeClient(grpcAddr string) *nodeClient {
}

func (n *nodeClient) subscribe(ctx context.Context, dons ...string) error {
conn, err := n.connect(true)
conn, err := n.connect(false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/reports.go → examples/don_simple/reports.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tests
package donsimple

import (
"bytes"
Expand All @@ -25,7 +25,7 @@ func (rb *reportsBuffer) addReport(don string, sr MockedSignedReport) bool {
}
buf := rb.reports[don]
if len(buf) == 0 {
buf = make([]MockedSignedReport, 0, rb.bufferSize)
buf = make([]MockedSignedReport, rb.bufferSize)
}
i := sr.SeqNumber % int64(rb.bufferSize)
// update only if newer than existing report in buffer
Expand Down

0 comments on commit effdc5b

Please sign in to comment.