diff --git a/Makefile b/Makefile index 8578a37..19ca936 100644 --- a/Makefile +++ b/Makefile @@ -13,13 +13,16 @@ fmt: @go fmt ./... test: - @go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources"` + @go test -v -race -timeout=${TEST_TIMEOUT} `go list ./... | grep -v -E "cmd|scripts|resources|examples"` + +test-examples: + @make TEST_PKG=./examples/... test-pkg test-pkg: @go test -v -race -timeout=${TEST_TIMEOUT} ${TEST_PKG} test-cov: - @go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources"` + @go test -v -race -timeout=${TEST_TIMEOUT} -coverprofile cover.out `go list ./... | grep -v -E "cmd|scripts|resources|examples"` test-open-cov: @make test-cov diff --git a/core/ctrl.go b/core/ctrl.go index 9825117..61e5615 100644 --- a/core/ctrl.go +++ b/core/ctrl.go @@ -110,8 +110,8 @@ func (c *Controller) Start(ctx context.Context) { func (c *Controller) Close() { c.StopOnce(func() { + c.lggr.Debugf("closing controller with host %s", c.host.ID()) c.threadControl.Close() - // d.lggr.Debugf("closing controller with host %s", d.host.ID()) if c.dht != nil { if err := c.dht.Close(); err != nil { c.lggr.Errorf("failed to close DHT: %w", err) diff --git a/core/testutils.go b/core/testutils.go index d447d98..b1c552c 100644 --- a/core/testutils.go +++ b/core/testutils.go @@ -85,7 +85,7 @@ func SetupTestControllers(ctx context.Context, t *testing.T, n int, routingFn fu waitControllersConnected(n) return controllers, msgRouters, valRouters, func() { - boot.Close() + go boot.Close() // closing bootstrapper in the background for _, c := range controllers { c.Close() } diff --git a/tests/don.go b/examples/don_simple/don.go similarity index 92% rename from tests/don.go rename to examples/don_simple/don.go index c93bc1d..9b6c4e9 100644 --- a/tests/don.go +++ b/examples/don_simple/don.go @@ -1,8 +1,9 @@ -package tests +package donsimple import ( "context" "fmt" + "strings" "sync" "time" @@ -41,6 +42,9 @@ func (d *mockedDon) run(interval time.Duration, subscribedDONs ...string) { } d.threadControl.Go(func(c context.Context) { if err := node.subscribe(c, subscribedDONs...); err != nil { + if strings.Contains(err.Error(), "already tring to join") { + return + } panic(err) } }) @@ -93,9 +97,10 @@ func (d *mockedDon) nextReport() *MockedSignedReport { func (d *mockedDon) broadcast(r *MockedSignedReport) { for _, n := range d.nodes { - n.reports.addReport(d.id, *r) + node := n + node.reports.addReport(d.id, *r) d.threadControl.Go(func(ctx context.Context) { - conn, err := n.connect(true) + conn, err := node.connect(false) if err != nil { return } diff --git a/tests/dons_test.go b/examples/don_simple/dons_test.go similarity index 86% rename from tests/dons_test.go rename to examples/don_simple/dons_test.go index 2d1084d..397e8dd 100644 --- a/tests/dons_test.go +++ b/examples/don_simple/dons_test.go @@ -1,4 +1,4 @@ -package tests +package donsimple import ( "context" @@ -8,7 +8,6 @@ import ( "time" grpcapi "github.com/amirylm/p2pmq/api/grpc" - "github.com/amirylm/p2pmq/commons/utils" "github.com/amirylm/p2pmq/core" logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -49,8 +48,11 @@ func TestCrossDONCommunication(t *testing.T) { reportsInterval: time.Second * 1, }, "mercury": { - nodes: 4, - dons: 1, + nodes: 4, + dons: 1, + subscribed: []string{ + "test", + }, reportsInterval: time.Second * 2, }, "transmit": { @@ -96,9 +98,6 @@ func TestCrossDONCommunication(t *testing.T) { grpcServers[i] = grpcapi.NewGrpcServer(control, msgR, valR) } - threadC := utils.NewThreadControl() - defer threadC.Close() - addrs := make([]string, n) nodes := make([]*nodeClient, n) for i, s := range grpcServers { @@ -107,14 +106,20 @@ func TestCrossDONCommunication(t *testing.T) { port := randPort() addrs[i] = fmt.Sprintf(":%d", port) nodes[i] = newNodeClient(fmt.Sprintf(":%d", port)) - threadC.Go(func(ctx context.Context) { + go func() { err := grpcapi.ListenGrpc(srv, port) if ctx.Err() == nil { require.NoError(t, err) } - }) + }() } } + defer func() { + for _, s := range grpcServers { + s.Stop() + } + t.Log("grpc servers stopped") + }() <-time.After(time.Second * 5) // TODO: avoid timeout @@ -149,30 +154,39 @@ func TestCrossDONCommunication(t *testing.T) { "test": int(testDuration) / int(donsCfg["test"].reportsInterval), } +checkLoop: for ctx.Err() == nil { - <-time.After(time.Second * 5) + <-time.After(testDuration / 4) for did, exp := range expectedReports { instances := dons[did] for _, don := range instances { reportsCount := don.reportsCount() - for reportsCount < exp && ctx.Err() == nil { + for reportsCount+1 < exp && ctx.Err() == nil { time.Sleep(time.Second) reportsCount = don.reportsCount() } + if ctx.Err() == nil { + t.Logf("DON %s reports count: %d", did, expectedReports[did]) + // we have enough reports for this don + expectedReports[did] = 0 + } } - if ctx.Err() == nil { - t.Logf("DON %s reports count: %d", did, expectedReports[did]) - // we have enough reports for this don - expectedReports[did] = 0 + } + for _, exp := range expectedReports { + if exp != 0 { + continue checkLoop } } + break } - // <-time.After(testDuration + testDuration/4) + <-time.After(testDuration / 4) for did, exp := range expectedReports { require.Equal(t, 0, exp, "DON %s reports count", did) } + t.Log("done") + cancel() } func getRandomNodes(n int, items []*nodeClient) []*nodeClient { diff --git a/tests/node.go b/examples/don_simple/node.go similarity index 98% rename from tests/node.go rename to examples/don_simple/node.go index 645c772..ef16d06 100644 --- a/tests/node.go +++ b/examples/don_simple/node.go @@ -1,4 +1,4 @@ -package tests +package donsimple import ( "context" @@ -38,7 +38,7 @@ func newNodeClient(grpcAddr string) *nodeClient { } func (n *nodeClient) subscribe(ctx context.Context, dons ...string) error { - conn, err := n.connect(true) + conn, err := n.connect(false) if err != nil { return err } diff --git a/tests/reports.go b/examples/don_simple/reports.go similarity index 96% rename from tests/reports.go rename to examples/don_simple/reports.go index bfbfcc3..c1ffa2e 100644 --- a/tests/reports.go +++ b/examples/don_simple/reports.go @@ -1,4 +1,4 @@ -package tests +package donsimple import ( "bytes" @@ -25,7 +25,7 @@ func (rb *reportsBuffer) addReport(don string, sr MockedSignedReport) bool { } buf := rb.reports[don] if len(buf) == 0 { - buf = make([]MockedSignedReport, 0, rb.bufferSize) + buf = make([]MockedSignedReport, rb.bufferSize) } i := sr.SeqNumber % int64(rb.bufferSize) // update only if newer than existing report in buffer