Skip to content

Commit

Permalink
use grpc/client package instead of inline implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Nov 19, 2023
1 parent 6db2a1a commit 21bd175
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 204 deletions.
100 changes: 0 additions & 100 deletions examples/don/consumer.go

This file was deleted.

12 changes: 0 additions & 12 deletions examples/don/endpoint.go

This file was deleted.

3 changes: 2 additions & 1 deletion examples/don/localdon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"

grpcapi "github.com/amirylm/p2pmq/api/grpc"
"github.com/amirylm/p2pmq/api/grpc/client"
"github.com/amirylm/p2pmq/core"
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func TestCrossDONCommunication(t *testing.T) {
srv := s
port := randPort()
addrs[i] = fmt.Sprintf(":%d", port)
nodes[i] = NewNode(GrpcEndPoint(fmt.Sprintf(":%d", port)))
nodes[i] = NewNode(client.GrpcEndPoint(fmt.Sprintf(":%d", port)))
go func() {
err := grpcapi.ListenGrpc(srv, port)
if ctx.Err() == nil {
Expand Down
36 changes: 28 additions & 8 deletions examples/don/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,58 @@ package don
import (
"context"

"github.com/amirylm/p2pmq/api/grpc/client"
"github.com/amirylm/p2pmq/commons/utils"
"github.com/amirylm/p2pmq/proto"
)

type Node struct {
utils.StartStopOnce
threadC utils.ThreadControl

Grpc GrpcEndPoint
Grpc client.GrpcEndPoint
Reports *ReportBuffer

Transmitter OverlayTrasnmitter
Consumer MsgConsumer
Verifier Verifier
Signers map[string]Signer
// Consumer is the consumer client to get messages from the MsgRouter
Consumer client.Consumer
// Verifier is the verifier client that faciliates interaction with the ValidationRouter
Verifier client.Verifier
// reportVerifier is doing the actual validation on incoming messages
reportVerifier *reportVerifier
Signers map[string]Signer
}

func NewNode(grpc GrpcEndPoint, opts ...NodeOpt) *Node {
func NewNode(grpc client.GrpcEndPoint, opts ...NodeOpt) *Node {
nodeOpts := defaultNodeOpts()
for _, opt := range opts {
opt(nodeOpts)
}
reports := NewReportBuffer(nodeOpts.bufferSize)
reportVerifier := NewReportVerifier(reports)
return &Node{
threadC: utils.NewThreadControl(),

Grpc: grpc,
Reports: reports,

Transmitter: NewOverlayTransmitter(grpc, nodeOpts.reportManipulator),
Consumer: NewMsgConsumer(reports, grpc),
Verifier: NewVerifier(reports, grpc),
Signers: map[string]Signer{},
Consumer: client.NewConsumer(grpc, func(msg *proto.Message) error {
r, err := UnmarshalReport(msg.GetData())
if err != nil || r == nil {
// bad encoding, not expected
return err
}
if reports.Add(r.Src, *r) {
// TODO: notify new report
}
return nil
}),
Verifier: client.NewVerifier(grpc, func(msg *proto.Message) proto.ValidationResult {
_, res := reportVerifier.Process(msg)
return res
}),
Signers: map[string]Signer{},
}
}

Expand Down
5 changes: 3 additions & 2 deletions examples/don/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/amirylm/p2pmq/api/grpc/client"
"github.com/amirylm/p2pmq/proto"
)

Expand All @@ -16,11 +17,11 @@ type OverlayTrasnmitter interface {
}

type overlayTransmitter struct {
grpc GrpcEndPoint
grpc client.GrpcEndPoint
reportManipulator func(*MockedSignedReport)
}

func NewOverlayTransmitter(grpc GrpcEndPoint, reportManipulator func(*MockedSignedReport)) OverlayTrasnmitter {
func NewOverlayTransmitter(grpc client.GrpcEndPoint, reportManipulator func(*MockedSignedReport)) OverlayTrasnmitter {
return &overlayTransmitter{
grpc: grpc,
reportManipulator: reportManipulator,
Expand Down
93 changes: 15 additions & 78 deletions examples/don/verifier.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package don

import (
"context"
"fmt"
"io"

"github.com/amirylm/p2pmq/commons/utils"
"github.com/amirylm/p2pmq/proto"
)

Expand All @@ -14,87 +11,27 @@ var (
invalidThreshold int64 = 25
)

type Verifier interface {
Start(context.Context) error
Stop()

Process(raw []byte) ([]byte, proto.ValidationResult)
}

type verifier struct {
threadCtrl utils.ThreadControl
grpc GrpcEndPoint
reports *ReportBuffer
dons map[string]map[OracleID]OnchainPublicKey
}

func NewVerifier(reports *ReportBuffer, grpc GrpcEndPoint) Verifier {
return &verifier{
threadCtrl: utils.NewThreadControl(),
grpc: grpc,
reports: reports,
dons: make(map[string]map[OracleID]OnchainPublicKey),
}
// reportVerifier is doing the actual validation on incoming messages
type reportVerifier struct {
reports *ReportBuffer
dons map[string]map[OracleID]OnchainPublicKey
}

func (v *verifier) Start(ctx context.Context) error {
conn, err := v.grpc.Connect()
if err != nil {
return err
}
valRouter := proto.NewValidationRouterClient(conn)
routerClient, err := valRouter.Handle(ctx)
if err != nil {
return err
}

valQ := make(chan *proto.Message, 1)

v.threadCtrl.Go(func(ctx context.Context) {
defer close(valQ)

for ctx.Err() == nil {
msg, err := routerClient.Recv()
if err == io.EOF || err == context.Canceled || ctx.Err() != nil || msg == nil { // stream closed
return
}
select {
case <-ctx.Done():
return
case valQ <- msg:
}
}
})

for {
select {
case <-ctx.Done():
return nil
case next := <-valQ:
if next == nil {
return nil
}
_, result := v.Process(next.GetData())
res := &proto.ValidatedMessage{
Result: result,
Msg: next,
}
routerClient.Send(res)
}
func NewReportVerifier(reports *ReportBuffer) *reportVerifier {
return &reportVerifier{
reports: reports,
dons: make(map[string]map[OracleID]OnchainPublicKey),
}
}

func (v *verifier) Stop() {
v.threadCtrl.Close()
}

func (v *verifier) Process(raw []byte) ([]byte, proto.ValidationResult) {
func (rv *reportVerifier) Process(msg *proto.Message) ([]byte, proto.ValidationResult) {
raw := msg.GetData()
r, err := UnmarshalReport(raw)
if err != nil || r == nil {
// bad encoding
return raw, proto.ValidationResult_REJECT
}
pubkeys, ok := v.dons[r.Src]
pubkeys, ok := rv.dons[r.Src]
if !ok {
return raw, proto.ValidationResult_IGNORE
}
Expand Down Expand Up @@ -124,11 +61,11 @@ func (v *verifier) Process(raw []byte) ([]byte, proto.ValidationResult) {
return raw, proto.ValidationResult_REJECT
}

return raw, v.validateSequence(r)
return raw, rv.validateSequence(r)
}

func (v *verifier) validateSequence(r *MockedSignedReport) proto.ValidationResult {
latest := v.reports.GetLatest(r.Src)
func (rv *reportVerifier) validateSequence(r *MockedSignedReport) proto.ValidationResult {
latest := rv.reports.GetLatest(r.Src)
if latest != nil {
diff := r.SeqNumber - latest.SeqNumber
switch {
Expand All @@ -139,7 +76,7 @@ func (v *verifier) validateSequence(r *MockedSignedReport) proto.ValidationResul
default: // less than skipThreshold, accept
}
}
if v.reports.Get(r.Src, r.SeqNumber) != nil {
if rv.reports.Get(r.Src, r.SeqNumber) != nil {
return proto.ValidationResult_IGNORE
}
return proto.ValidationResult_ACCEPT
Expand Down
5 changes: 2 additions & 3 deletions examples/don/verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
func TestVerifier_validateSequence(t *testing.T) {
n := 100
rb := NewReportBuffer(n)

v := NewVerifier(rb, "")
v := NewReportVerifier(rb)

missing := map[int]bool{
n - 5: true,
Expand Down Expand Up @@ -66,7 +65,7 @@ func TestVerifier_validateSequence(t *testing.T) {
// TODO: fix
r, err := NewMockedSignedReport(nil, tc.seq, tc.don, tc.data)
require.NoError(t, err)
require.Equal(t, tc.res, v.(*verifier).validateSequence(r))
require.Equal(t, tc.res, v.validateSequence(r))
})
}
}

0 comments on commit 21bd175

Please sign in to comment.