Skip to content

Commit

Permalink
refactor don example
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Oct 2, 2023
1 parent fe23c01 commit d669835
Show file tree
Hide file tree
Showing 13 changed files with 592 additions and 304 deletions.
14 changes: 14 additions & 0 deletions examples/don/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# DON simple example

This package shows a high level integration with DONs.

Multiple nodes are created for serving multiple DONs.
Each node is composed of the following components:
- p2p agent integrated with grpc (`../api/grpc/*`)
- orchestrator (`./lib/node.go`) to manage the internal processes within the node
- producer (`./lib/transmitter.go`) to publish messages
- message consumer (`./lib/consumer.go`)
- verifier (`./lib/verifier.go`) and signer (`./lib/signer.go`) to verify and sign messages

The nodes are connected to each other via a local p2p network.
A boostrapper node (kad DHT) is used for peer discovery, providing an entry point to the network.
100 changes: 100 additions & 0 deletions examples/don/lib/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package donlib

import (
"context"
"io"

"github.com/amirylm/p2pmq/commons/utils"
"github.com/amirylm/p2pmq/proto"
)

type MsgConsumer interface {
Start(context.Context) error
Subscribe(ctx context.Context, dons ...string) error
Stop()
}

type msgConsumer struct {
threadCtrl utils.ThreadControl
grpc GrpcEndPoint
reports *ReportBuffer
}

func NewMsgConsumer(reports *ReportBuffer, grpc GrpcEndPoint) MsgConsumer {
return &msgConsumer{
threadCtrl: utils.NewThreadControl(),
grpc: grpc,
reports: reports,
}
}

func (c *msgConsumer) Subscribe(ctx context.Context, dons ...string) error {
conn, err := c.grpc.Connect()
if err != nil {
return err
}
controlClient := proto.NewControlServiceClient(conn)
for _, did := range dons {
_, err = controlClient.Subscribe(ctx, &proto.SubscribeRequest{
Topic: donTopic(did),
})
if err != nil {
return err
}
}
return nil
}

func (c *msgConsumer) Start(ctx context.Context) error {
conn, err := c.grpc.Connect()
if err != nil {
return err
}
msgRouter := proto.NewMsgRouterClient(conn)
routerClient, err := msgRouter.Listen(ctx, &proto.ListenRequest{})
if err != nil {
return err
}

msgQ := make(chan *proto.Message, 128)

c.threadCtrl.Go(func(ctx context.Context) {
defer close(msgQ)

for ctx.Err() == nil {
msg, err := routerClient.Recv()
if err == io.EOF || err == context.Canceled || ctx.Err() != nil || msg == nil { // stream closed
return
}
select {
case msgQ <- msg:
default:
// dropped, TODO: handle or log
}
}
})

for {
select {
case <-ctx.Done():
return nil
case msg := <-msgQ:
if msg == nil {
return nil
}
r, err := UnmarshalReport(msg.GetData())
if err != nil || r == nil {
// bad encoding, not expected
continue
}
if c.reports.Add(r.Src, *r) {
// TODO: notify new report
continue
}
}
}
}

func (v *msgConsumer) Stop() {
v.threadCtrl.Close()
}
12 changes: 12 additions & 0 deletions examples/don/lib/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package donlib

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type GrpcEndPoint string

func (gep GrpcEndPoint) Connect() (*grpc.ClientConn, error) {
return grpc.Dial(string(gep), grpc.WithTransportCredentials(insecure.NewCredentials()))
}
96 changes: 96 additions & 0 deletions examples/don/lib/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package donlib

import (
"context"

"github.com/amirylm/p2pmq/commons/utils"
)

type Node struct {
utils.StartStopOnce
threadC utils.ThreadControl

Grpc GrpcEndPoint
Reports *ReportBuffer

Transmitter OverlayTrasnmitter
Consumer MsgConsumer
Verifier Verifier
Signer Signer
}

func NewNode(grpc GrpcEndPoint, opts ...NodeOpt) *Node {
nodeOpts := defaultNodeOpts()
for _, opt := range opts {
opt(nodeOpts)
}
reports := NewReportBuffer(nodeOpts.bufferSize)
return &Node{
threadC: utils.NewThreadControl(),

Grpc: grpc,
Reports: reports,

Transmitter: NewOverlayTransmitter(grpc, nodeOpts.reportManipulator),
Consumer: NewMsgConsumer(reports, grpc),
Verifier: NewVerifier(reports, grpc, nodeOpts.signer),
Signer: nodeOpts.signer,
}
}

func (n *Node) Start() {
n.StartOnce(func() {
n.threadC.Go(func(ctx context.Context) {
defer n.Consumer.Stop()
if err := n.Consumer.Start(ctx); err != nil {
panic(err)
}
})
n.threadC.Go(func(ctx context.Context) {
defer n.Verifier.Stop()
if err := n.Verifier.Start(ctx); err != nil {
panic(err)
}
})
})
}

func (n *Node) Stop() {
n.StopOnce(func() {
n.threadC.Close()
})
}

type nodeOpts struct {
reportManipulator func(*MockedSignedReport)
bufferSize int
signer Signer
}

func defaultNodeOpts() *nodeOpts {
return &nodeOpts{
reportManipulator: func(*MockedSignedReport) {},
bufferSize: 1024,
signer: &Sha256Signer{},
}
}

type NodeOpt func(*nodeOpts)

func WithReportManipulator(reportManipulator func(*MockedSignedReport)) NodeOpt {
return func(opts *nodeOpts) {
opts.reportManipulator = reportManipulator
}
}

func WithBufferSize(bufferSize int) NodeOpt {
return func(opts *nodeOpts) {
opts.bufferSize = bufferSize
}
}

func WithSigner(signer Signer) NodeOpt {
return func(opts *nodeOpts) {
opts.signer = signer
}
}
49 changes: 49 additions & 0 deletions examples/don/lib/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package donlib

import (
"encoding/json"
"fmt"
)

func NewMockedSignedReport(signer Signer, seqNumber int64, srcDON string, data []byte) (*MockedSignedReport, error) {
sr := &MockedSignedReport{
SeqNumber: seqNumber,
Src: srcDON,
Data: data,
}
sig, err := signer.Sign([]byte(fmt.Sprintf("%+v", sr)))
if err != nil {
return nil, err
}
sr.Sig = sig
return sr, nil
}

type MockedSignedReport struct {
SeqNumber int64
// Src DON
Src string
Data []byte
Sig []byte
}

func (r *MockedSignedReport) GetReportData() []byte {
sr := &MockedSignedReport{
SeqNumber: r.SeqNumber,
Src: r.Src,
Data: r.Data,
}
return []byte(fmt.Sprintf("%+v", sr))
}

func MarshalReport(r *MockedSignedReport) ([]byte, error) {
return json.Marshal(r)
}

func UnmarshalReport(data []byte) (*MockedSignedReport, error) {
r := new(MockedSignedReport)
if err := json.Unmarshal(data, r); err != nil {
return nil, err
}
return r, nil
}
80 changes: 80 additions & 0 deletions examples/don/lib/reportbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package donlib

import (
"sync"
)

type ReportBuffer struct {
lock sync.RWMutex
reports map[string][]MockedSignedReport

bufferSize int
}

func NewReportBuffer(bufferSize int) *ReportBuffer {
return &ReportBuffer{
bufferSize: bufferSize,
}
}

func (rb *ReportBuffer) Add(don string, sr MockedSignedReport) bool {
rb.lock.Lock()
defer rb.lock.Unlock()

if rb.reports == nil {
rb.reports = make(map[string][]MockedSignedReport)
}
buf := rb.reports[don]
if len(buf) == 0 {
buf = make([]MockedSignedReport, rb.bufferSize)
}
i := sr.SeqNumber % int64(rb.bufferSize)
// update only if newer than existing report in buffer
if buf[i].SeqNumber < sr.SeqNumber {
buf[i] = sr
rb.reports[don] = buf
return true
}
return false
}

func (rb *ReportBuffer) Get(don string, seq int64) *MockedSignedReport {
rb.lock.RLock()
defer rb.lock.RUnlock()

if rb.reports == nil {
return nil
}
buf := rb.reports[don]
if len(buf) == 0 {
return nil
}

i := seq % int64(rb.bufferSize)
if buf[i].SeqNumber == seq {
r := buf[i]
return &r
}
return nil
}

func (rb *ReportBuffer) GetLatest(don string) *MockedSignedReport {
rb.lock.RLock()
defer rb.lock.RUnlock()

if rb.reports == nil {
return nil
}
buf := rb.reports[don]
if len(buf) == 0 {
return nil
}

var latest *MockedSignedReport
for _, r := range buf {
if latest == nil || r.SeqNumber > latest.SeqNumber {
latest = &r
}
}
return latest
}
28 changes: 28 additions & 0 deletions examples/don/lib/signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package donlib

import (
"bytes"
"crypto/sha256"
"fmt"
)

type Signer interface {
Sign(data []byte) ([]byte, error)
Verify(signed, pk, data []byte) error
}

// Sha256Signer is a mocked signer that uses `sha256(data)` as signature.
type Sha256Signer struct{}

func (s *Sha256Signer) Sign(data []byte) ([]byte, error) {
h := sha256.Sum256(data)
return h[:], nil
}

func (s *Sha256Signer) Verify(signed, _, data []byte) error {
h := sha256.Sum256(data)
if !bytes.Equal(signed, h[:]) {
return fmt.Errorf("signature mismatch")
}
return nil
}
Loading

0 comments on commit d669835

Please sign in to comment.