From 21bd17536b5f4fd7989e7e56adba22fce4f09887 Mon Sep 17 00:00:00 2001 From: amirylm Date: Sun, 19 Nov 2023 20:08:52 -0300 Subject: [PATCH] use grpc/client package instead of inline implementations --- examples/don/consumer.go | 100 ---------------------------------- examples/don/endpoint.go | 12 ---- examples/don/localdon_test.go | 3 +- examples/don/node.go | 36 +++++++++--- examples/don/transmitter.go | 5 +- examples/don/verifier.go | 93 +++++-------------------------- examples/don/verifier_test.go | 5 +- 7 files changed, 50 insertions(+), 204 deletions(-) delete mode 100644 examples/don/consumer.go delete mode 100644 examples/don/endpoint.go diff --git a/examples/don/consumer.go b/examples/don/consumer.go deleted file mode 100644 index b1c0a07..0000000 --- a/examples/don/consumer.go +++ /dev/null @@ -1,100 +0,0 @@ -package don - -import ( - "context" - "io" - - "github.com/amirylm/p2pmq/commons/utils" - "github.com/amirylm/p2pmq/proto" -) - -type MsgConsumer interface { - Start(context.Context) error - Subscribe(ctx context.Context, dons ...string) error - Stop() -} - -type msgConsumer struct { - threadCtrl utils.ThreadControl - grpc GrpcEndPoint - reports *ReportBuffer -} - -func NewMsgConsumer(reports *ReportBuffer, grpc GrpcEndPoint) MsgConsumer { - return &msgConsumer{ - threadCtrl: utils.NewThreadControl(), - grpc: grpc, - reports: reports, - } -} - -func (c *msgConsumer) Subscribe(ctx context.Context, dons ...string) error { - conn, err := c.grpc.Connect() - if err != nil { - return err - } - controlClient := proto.NewControlServiceClient(conn) - for _, did := range dons { - _, err = controlClient.Subscribe(ctx, &proto.SubscribeRequest{ - Topic: donTopic(did), - }) - if err != nil { - return err - } - } - return nil -} - -func (c *msgConsumer) Start(ctx context.Context) error { - conn, err := c.grpc.Connect() - if err != nil { - return err - } - msgRouter := proto.NewMsgRouterClient(conn) - routerClient, err := msgRouter.Listen(ctx, &proto.ListenRequest{}) - if err != nil { - return err - } - - msgQ := make(chan *proto.Message, 128) - - c.threadCtrl.Go(func(ctx context.Context) { - defer close(msgQ) - - for ctx.Err() == nil { - msg, err := routerClient.Recv() - if err == io.EOF || err == context.Canceled || ctx.Err() != nil || msg == nil { // stream closed - return - } - select { - case msgQ <- msg: - default: - // dropped, TODO: handle or log - } - } - }) - - for { - select { - case <-ctx.Done(): - return nil - case msg := <-msgQ: - if msg == nil { - return nil - } - r, err := UnmarshalReport(msg.GetData()) - if err != nil || r == nil { - // bad encoding, not expected - continue - } - if c.reports.Add(r.Src, *r) { - // TODO: notify new report - continue - } - } - } -} - -func (v *msgConsumer) Stop() { - v.threadCtrl.Close() -} diff --git a/examples/don/endpoint.go b/examples/don/endpoint.go deleted file mode 100644 index 19306a4..0000000 --- a/examples/don/endpoint.go +++ /dev/null @@ -1,12 +0,0 @@ -package don - -import ( - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -type GrpcEndPoint string - -func (gep GrpcEndPoint) Connect() (*grpc.ClientConn, error) { - return grpc.Dial(string(gep), grpc.WithTransportCredentials(insecure.NewCredentials())) -} diff --git a/examples/don/localdon_test.go b/examples/don/localdon_test.go index bd715e2..aa92dc2 100644 --- a/examples/don/localdon_test.go +++ b/examples/don/localdon_test.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" grpcapi "github.com/amirylm/p2pmq/api/grpc" + "github.com/amirylm/p2pmq/api/grpc/client" "github.com/amirylm/p2pmq/core" ) @@ -106,7 +107,7 @@ func TestCrossDONCommunication(t *testing.T) { srv := s port := randPort() addrs[i] = fmt.Sprintf(":%d", port) - nodes[i] = NewNode(GrpcEndPoint(fmt.Sprintf(":%d", port))) + nodes[i] = NewNode(client.GrpcEndPoint(fmt.Sprintf(":%d", port))) go func() { err := grpcapi.ListenGrpc(srv, port) if ctx.Err() == nil { diff --git a/examples/don/node.go b/examples/don/node.go index 657112d..769df9c 100644 --- a/examples/don/node.go +++ b/examples/don/node.go @@ -3,28 +3,35 @@ package don import ( "context" + "github.com/amirylm/p2pmq/api/grpc/client" "github.com/amirylm/p2pmq/commons/utils" + "github.com/amirylm/p2pmq/proto" ) type Node struct { utils.StartStopOnce threadC utils.ThreadControl - Grpc GrpcEndPoint + Grpc client.GrpcEndPoint Reports *ReportBuffer Transmitter OverlayTrasnmitter - Consumer MsgConsumer - Verifier Verifier - Signers map[string]Signer + // Consumer is the consumer client to get messages from the MsgRouter + Consumer client.Consumer + // Verifier is the verifier client that faciliates interaction with the ValidationRouter + Verifier client.Verifier + // reportVerifier is doing the actual validation on incoming messages + reportVerifier *reportVerifier + Signers map[string]Signer } -func NewNode(grpc GrpcEndPoint, opts ...NodeOpt) *Node { +func NewNode(grpc client.GrpcEndPoint, opts ...NodeOpt) *Node { nodeOpts := defaultNodeOpts() for _, opt := range opts { opt(nodeOpts) } reports := NewReportBuffer(nodeOpts.bufferSize) + reportVerifier := NewReportVerifier(reports) return &Node{ threadC: utils.NewThreadControl(), @@ -32,9 +39,22 @@ func NewNode(grpc GrpcEndPoint, opts ...NodeOpt) *Node { Reports: reports, Transmitter: NewOverlayTransmitter(grpc, nodeOpts.reportManipulator), - Consumer: NewMsgConsumer(reports, grpc), - Verifier: NewVerifier(reports, grpc), - Signers: map[string]Signer{}, + Consumer: client.NewConsumer(grpc, func(msg *proto.Message) error { + r, err := UnmarshalReport(msg.GetData()) + if err != nil || r == nil { + // bad encoding, not expected + return err + } + if reports.Add(r.Src, *r) { + // TODO: notify new report + } + return nil + }), + Verifier: client.NewVerifier(grpc, func(msg *proto.Message) proto.ValidationResult { + _, res := reportVerifier.Process(msg) + return res + }), + Signers: map[string]Signer{}, } } diff --git a/examples/don/transmitter.go b/examples/don/transmitter.go index 120f332..3837c2e 100644 --- a/examples/don/transmitter.go +++ b/examples/don/transmitter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/amirylm/p2pmq/api/grpc/client" "github.com/amirylm/p2pmq/proto" ) @@ -16,11 +17,11 @@ type OverlayTrasnmitter interface { } type overlayTransmitter struct { - grpc GrpcEndPoint + grpc client.GrpcEndPoint reportManipulator func(*MockedSignedReport) } -func NewOverlayTransmitter(grpc GrpcEndPoint, reportManipulator func(*MockedSignedReport)) OverlayTrasnmitter { +func NewOverlayTransmitter(grpc client.GrpcEndPoint, reportManipulator func(*MockedSignedReport)) OverlayTrasnmitter { return &overlayTransmitter{ grpc: grpc, reportManipulator: reportManipulator, diff --git a/examples/don/verifier.go b/examples/don/verifier.go index 4d3a2d3..c2dd94a 100644 --- a/examples/don/verifier.go +++ b/examples/don/verifier.go @@ -1,11 +1,8 @@ package don import ( - "context" "fmt" - "io" - "github.com/amirylm/p2pmq/commons/utils" "github.com/amirylm/p2pmq/proto" ) @@ -14,87 +11,27 @@ var ( invalidThreshold int64 = 25 ) -type Verifier interface { - Start(context.Context) error - Stop() - - Process(raw []byte) ([]byte, proto.ValidationResult) -} - -type verifier struct { - threadCtrl utils.ThreadControl - grpc GrpcEndPoint - reports *ReportBuffer - dons map[string]map[OracleID]OnchainPublicKey -} - -func NewVerifier(reports *ReportBuffer, grpc GrpcEndPoint) Verifier { - return &verifier{ - threadCtrl: utils.NewThreadControl(), - grpc: grpc, - reports: reports, - dons: make(map[string]map[OracleID]OnchainPublicKey), - } +// reportVerifier is doing the actual validation on incoming messages +type reportVerifier struct { + reports *ReportBuffer + dons map[string]map[OracleID]OnchainPublicKey } -func (v *verifier) Start(ctx context.Context) error { - conn, err := v.grpc.Connect() - if err != nil { - return err - } - valRouter := proto.NewValidationRouterClient(conn) - routerClient, err := valRouter.Handle(ctx) - if err != nil { - return err - } - - valQ := make(chan *proto.Message, 1) - - v.threadCtrl.Go(func(ctx context.Context) { - defer close(valQ) - - for ctx.Err() == nil { - msg, err := routerClient.Recv() - if err == io.EOF || err == context.Canceled || ctx.Err() != nil || msg == nil { // stream closed - return - } - select { - case <-ctx.Done(): - return - case valQ <- msg: - } - } - }) - - for { - select { - case <-ctx.Done(): - return nil - case next := <-valQ: - if next == nil { - return nil - } - _, result := v.Process(next.GetData()) - res := &proto.ValidatedMessage{ - Result: result, - Msg: next, - } - routerClient.Send(res) - } +func NewReportVerifier(reports *ReportBuffer) *reportVerifier { + return &reportVerifier{ + reports: reports, + dons: make(map[string]map[OracleID]OnchainPublicKey), } } -func (v *verifier) Stop() { - v.threadCtrl.Close() -} - -func (v *verifier) Process(raw []byte) ([]byte, proto.ValidationResult) { +func (rv *reportVerifier) Process(msg *proto.Message) ([]byte, proto.ValidationResult) { + raw := msg.GetData() r, err := UnmarshalReport(raw) if err != nil || r == nil { // bad encoding return raw, proto.ValidationResult_REJECT } - pubkeys, ok := v.dons[r.Src] + pubkeys, ok := rv.dons[r.Src] if !ok { return raw, proto.ValidationResult_IGNORE } @@ -124,11 +61,11 @@ func (v *verifier) Process(raw []byte) ([]byte, proto.ValidationResult) { return raw, proto.ValidationResult_REJECT } - return raw, v.validateSequence(r) + return raw, rv.validateSequence(r) } -func (v *verifier) validateSequence(r *MockedSignedReport) proto.ValidationResult { - latest := v.reports.GetLatest(r.Src) +func (rv *reportVerifier) validateSequence(r *MockedSignedReport) proto.ValidationResult { + latest := rv.reports.GetLatest(r.Src) if latest != nil { diff := r.SeqNumber - latest.SeqNumber switch { @@ -139,7 +76,7 @@ func (v *verifier) validateSequence(r *MockedSignedReport) proto.ValidationResul default: // less than skipThreshold, accept } } - if v.reports.Get(r.Src, r.SeqNumber) != nil { + if rv.reports.Get(r.Src, r.SeqNumber) != nil { return proto.ValidationResult_IGNORE } return proto.ValidationResult_ACCEPT diff --git a/examples/don/verifier_test.go b/examples/don/verifier_test.go index a93ae60..8c4c3b3 100644 --- a/examples/don/verifier_test.go +++ b/examples/don/verifier_test.go @@ -10,8 +10,7 @@ import ( func TestVerifier_validateSequence(t *testing.T) { n := 100 rb := NewReportBuffer(n) - - v := NewVerifier(rb, "") + v := NewReportVerifier(rb) missing := map[int]bool{ n - 5: true, @@ -66,7 +65,7 @@ func TestVerifier_validateSequence(t *testing.T) { // TODO: fix r, err := NewMockedSignedReport(nil, tc.seq, tc.don, tc.data) require.NoError(t, err) - require.Equal(t, tc.res, v.(*verifier).validateSequence(r)) + require.Equal(t, tc.res, v.validateSequence(r)) }) } }