From 6c6e40e01c1ec7cc065e36867f3cdda14d248b27 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 19 Sep 2024 13:16:57 +1000 Subject: [PATCH] Add consensus --- pkg/consensus/mimicry/client.go | 231 ++++++++++++++++++ pkg/consensus/mimicry/config.go | 12 + pkg/consensus/mimicry/encoder.go | 90 +++++++ pkg/consensus/mimicry/mode.go | 9 + pkg/consensus/mimicry/node.go | 48 ++++ pkg/consensus/mimicry/req.go | 112 +++++++++ pkg/consensus/mimicry/rpc_handlers.go | 224 ++++++++++++++++++ pkg/discovery/disc_v5.go | 327 ++++++++++++++++++++++++++ 8 files changed, 1053 insertions(+) create mode 100644 pkg/consensus/mimicry/client.go create mode 100644 pkg/consensus/mimicry/config.go create mode 100644 pkg/consensus/mimicry/encoder.go create mode 100644 pkg/consensus/mimicry/mode.go create mode 100644 pkg/consensus/mimicry/node.go create mode 100644 pkg/consensus/mimicry/req.go create mode 100644 pkg/consensus/mimicry/rpc_handlers.go create mode 100644 pkg/discovery/disc_v5.go diff --git a/pkg/consensus/mimicry/client.go b/pkg/consensus/mimicry/client.go new file mode 100644 index 0000000..88bf14f --- /dev/null +++ b/pkg/consensus/mimicry/client.go @@ -0,0 +1,231 @@ +package mimicry + +import ( + "context" + "crypto/ecdsa" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "sync" + "time" + + "github.com/decred/dcrd/dcrec/secp256k1/v4" + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p" + mplex "github.com/libp2p/go-libp2p-mplex" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/security/noise" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" + "github.com/sirupsen/logrus" +) + +var sszNetworkEncoder = encoder.SszNetworkEncoder{} + +const ( + StatusProtocolID = "/eth2/beacon_chain/req/status/1/" + encoder.ProtocolSuffixSSZSnappy + GoodbyeProtocolID = "/eth2/beacon_chain/req/goodbye/1/" + encoder.ProtocolSuffixSSZSnappy + PingProtocolID = "/eth2/beacon_chain/req/ping/1/" + encoder.ProtocolSuffixSSZSnappy + MetaDataProtocolID = "/eth2/beacon_chain/req/metadata/2/" + encoder.ProtocolSuffixSSZSnappy +) + +var ( + SupportedProtocols = []protocol.ID{ + StatusProtocolID, + GoodbyeProtocolID, + PingProtocolID, + MetaDataProtocolID, + } +) + +type Client struct { + log logrus.FieldLogger + config *Config + mode Mode + userAgent string + + localNode *enode.LocalNode + host host.Host + privKey *crypto.Secp256k1PrivateKey + + ctx context.Context + cancel context.CancelFunc + metadata *common.MetaData + status *common.Status + + statusMu sync.Mutex +} + +func NewClient(log logrus.FieldLogger, config *Config, mode Mode, userAgent string) *Client { + return &Client{ + log: log, + userAgent: userAgent, + config: config, + mode: mode, + statusMu: sync.Mutex{}, + status: &common.Status{}, + metadata: &common.MetaData{ + SeqNumber: 0, + Attnets: common.AttnetBits{}, + Syncnets: common.SyncnetBits{}, + }, + } +} + +func (c *Client) Start(ctx context.Context) error { + c.log.WithFields(logrus.Fields{ + "mode": c.mode, + }).Info("Starting Mimicry client") + + if _, err := c.derivePrivateKey(); err != nil { + return fmt.Errorf("failed to derive private key: %w", err) + } + + libp2pOptions := []libp2p.Option{ + libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/%s/tcp/%d", c.config.IPAddr.String(), c.config.TCPPort)), + libp2p.UserAgent(c.userAgent), + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Muxer(mplex.ID, mplex.DefaultTransport), + libp2p.DefaultMuxers, + libp2p.Security(noise.ID, noise.New), + libp2p.Ping(false), + libp2p.Identity(c.privKey), + libp2p.ResourceManager(&network.NullResourceManager{}), + } + + h, err := libp2p.New(libp2pOptions...) + if err != nil { + return fmt.Errorf("failed to create libp2p host: %w", err) + } + + c.host = h + + localNode, err := createLocalNode(c.privKey, c.config.IPAddr, c.config.UDPPort, c.config.TCPPort) + if err != nil { + return fmt.Errorf("failed to create local node: %w", err) + } + + c.localNode = localNode + + if err := c.registerHandlers(); err != nil { + return fmt.Errorf("failed to register handlers: %w", err) + } + + c.log.Info("Successfully started Mimicry client") + + return nil +} + +func (c *Client) Stop() error { + if err := c.host.Close(); err != nil { + return fmt.Errorf("failed to close host: %w", err) + } + + return nil +} + +func (c *Client) SetStatus(status *common.Status) { + c.statusMu.Lock() + defer c.statusMu.Unlock() + + c.status = status +} + +func (c *Client) GetStatus() common.Status { + c.statusMu.Lock() + defer c.statusMu.Unlock() + + return *c.status +} + +func (c *Client) derivePrivateKey() (*crypto.Secp256k1PrivateKey, error) { + if c.privKey != nil { + return c.privKey, nil + } + + var err error + + var privBytes []byte + + if c.config.PrivKey == "" { + key, errr := ecdsa.GenerateKey(gcrypto.S256(), rand.Reader) + if errr != nil { + return nil, fmt.Errorf("failed to generate key: %w", errr) + } + + privBytes = gcrypto.FromECDSA(key) + if len(privBytes) != secp256k1.PrivKeyBytesLen { + return nil, fmt.Errorf("expected secp256k1 data size to be %d", secp256k1.PrivKeyBytesLen) + } + } else { + privBytes, err = hex.DecodeString(c.config.PrivKey) + if err != nil { + return nil, fmt.Errorf("failed to decode private key: %w", err) + } + } + + c.privKey = (*crypto.Secp256k1PrivateKey)(secp256k1.PrivKeyFromBytes(privBytes)) + + if c.config.PrivKey == "" { + c.config.PrivKey = hex.EncodeToString(privBytes) + } + + return c.privKey, nil +} + +func (c *Client) ConnectToPeer(ctx context.Context, p peer.AddrInfo, enr *enode.Node) error { + c.log.WithFields(logrus.Fields{ + "peer": p.ID, + }).Info("Connecting to peer") + + // Check if we're already connected to the peer + if status := c.host.Network().Connectedness(p.ID); status == network.Connected { + return errors.New("already connected to peer") + } + + // Connect to the peer + if err := c.host.Connect(ctx, p); err != nil { + return fmt.Errorf("failed to connect to peer: %w", err) + } + + // Add the supported protocols to the peer + if err := c.host.Peerstore().AddProtocols(p.ID, SupportedProtocols...); err != nil { + return fmt.Errorf("failed to add protocols to peer: %w", err) + } + + // Send a status request + _, err := c.RequestStatusFromPeer(ctx, p.ID) + if err != nil { + return fmt.Errorf("failed to request status: %w", err) + } + + return nil +} + +func (c *Client) DisconnectFromPeer(ctx context.Context, peerID peer.ID) error { + c.log.WithFields(logrus.Fields{ + "peer": peerID, + }).Info("Disconnecting from peer") + + // Send a goodbye message + goodbye := common.Goodbye(0) + resp := common.Goodbye(0) + + if err := c.sendRequest(ctx, &Request{ + ProtocolID: GoodbyeProtocolID, + PeerID: peerID, + Payload: &goodbye, + Timeout: time.Second * 30, + }, &resp); err != nil { + return fmt.Errorf("failed to send goodbye message: %w", err) + } + + return c.host.Network().ClosePeer(peerID) +} diff --git a/pkg/consensus/mimicry/config.go b/pkg/consensus/mimicry/config.go new file mode 100644 index 0000000..4dccd5e --- /dev/null +++ b/pkg/consensus/mimicry/config.go @@ -0,0 +1,12 @@ +package mimicry + +import ( + "net" +) + +type Config struct { + IPAddr net.IP + UDPPort int + TCPPort int + PrivKey string +} diff --git a/pkg/consensus/mimicry/encoder.go b/pkg/consensus/mimicry/encoder.go new file mode 100644 index 0000000..6075d61 --- /dev/null +++ b/pkg/consensus/mimicry/encoder.go @@ -0,0 +1,90 @@ +package mimicry + +// Thanks to Mario: https://github.com/marioevz/blobber/blob/main/p2p/encoder.go + +import ( + "bytes" + + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/protolambda/ztyp/codec" + fastssz "github.com/prysmaticlabs/fastssz" +) + +type Marshaler interface { + fastssz.Marshaler + fastssz.Unmarshaler +} + +type wrappedSpecObjectEncoder struct { + common.SpecObj + *common.Spec +} + +func WrapSpecObject(spec *common.Spec, specObj common.SpecObj) Marshaler { + return &wrappedSpecObjectEncoder{ + SpecObj: specObj, + Spec: spec, + } +} + +func (w *wrappedSpecObjectEncoder) MarshalSSZTo(dst []byte) ([]byte, error) { + marshalledObj, err := w.MarshalSSZ() + if err != nil { + return nil, err + } + + return append(dst, marshalledObj...), nil +} + +func (w *wrappedSpecObjectEncoder) MarshalSSZ() ([]byte, error) { + var buf bytes.Buffer + if err := w.Serialize(w.Spec, codec.NewEncodingWriter(&buf)); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func (w *wrappedSpecObjectEncoder) SizeSSZ() int { + return int(w.SpecObj.ByteLength(w.Spec)) +} + +func (w *wrappedSpecObjectEncoder) UnmarshalSSZ(b []byte) error { + return w.Deserialize(w.Spec, codec.NewDecodingReader(bytes.NewReader(b), uint64(len(b)))) +} + +type wrappedSSZObjectEncoder struct { + common.SSZObj +} + +func WrapSSZObject(sszObj common.SSZObj) Marshaler { + return &wrappedSSZObjectEncoder{ + SSZObj: sszObj, + } +} + +func (w *wrappedSSZObjectEncoder) MarshalSSZTo(dst []byte) ([]byte, error) { + marshalledObj, err := w.MarshalSSZ() + if err != nil { + return nil, err + } + + return append(dst, marshalledObj...), nil +} + +func (w *wrappedSSZObjectEncoder) MarshalSSZ() ([]byte, error) { + var buf bytes.Buffer + if err := w.Serialize(codec.NewEncodingWriter(&buf)); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func (w *wrappedSSZObjectEncoder) SizeSSZ() int { + return int(w.SSZObj.ByteLength()) +} + +func (w *wrappedSSZObjectEncoder) UnmarshalSSZ(b []byte) error { + return w.Deserialize(codec.NewDecodingReader(bytes.NewReader(b), uint64(len(b)))) +} diff --git a/pkg/consensus/mimicry/mode.go b/pkg/consensus/mimicry/mode.go new file mode 100644 index 0000000..3de437c --- /dev/null +++ b/pkg/consensus/mimicry/mode.go @@ -0,0 +1,9 @@ +package mimicry + +type Mode string + +const ( + // ModeDiscovery is the mode where the node is only discovering peers. Once a peer connection is established + // and we get the `status` from the peer we will disconnect them. + ModeDiscovery Mode = "discovery" +) diff --git a/pkg/consensus/mimicry/node.go b/pkg/consensus/mimicry/node.go new file mode 100644 index 0000000..ec7fec6 --- /dev/null +++ b/pkg/consensus/mimicry/node.go @@ -0,0 +1,48 @@ +package mimicry + +import ( + "crypto/ecdsa" + "math/big" + "net" + + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/crypto" +) + +func createLocalNode( + privKey *crypto.Secp256k1PrivateKey, + ipAddr net.IP, + udpPort, tcpPort int, +) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + + rawKey, err := privKey.Raw() + if err != nil { + return nil, err + } + + priv := new(ecdsa.PrivateKey) + k := new(big.Int).SetBytes(rawKey) + priv.D = k + priv.Curve = gcrypto.S256() + priv.X, priv.Y = gcrypto.S256().ScalarBaseMult(rawKey) + + localNode := enode.NewLocalNode(db, priv) + + ipEntry := enr.IP(ipAddr) + udpEntry := enr.UDP(udpPort) + tcpEntry := enr.TCP(tcpPort) + + localNode.Set(ipEntry) + localNode.Set(udpEntry) + localNode.Set(tcpEntry) + localNode.SetFallbackIP(ipAddr) + localNode.SetFallbackUDP(udpPort) + + return localNode, nil +} diff --git a/pkg/consensus/mimicry/req.go b/pkg/consensus/mimicry/req.go new file mode 100644 index 0000000..f4a9a5f --- /dev/null +++ b/pkg/consensus/mimicry/req.go @@ -0,0 +1,112 @@ +package mimicry + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/sirupsen/logrus" +) + +type Request struct { + ProtocolID protocol.ID + PeerID peer.ID + Payload common.SSZObj + Timeout time.Duration +} + +func (c *Client) sendRequest(ctx context.Context, r *Request, rsp common.SSZObj) error { + // Open the stream + ctx, cancel := context.WithTimeout(ctx, r.Timeout) + defer cancel() + + logCtx := c.log.WithFields(logrus.Fields{ + "peer": r.PeerID, + "protocol_id": r.ProtocolID, + }) + + logCtx.Debug("Sending request") + + stream, err := c.host.NewStream(ctx, r.PeerID, r.ProtocolID) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) + } + + defer func() { + if err := stream.Close(); err != nil { + c.log.WithError(err).Error("Failed to close stream") + } + }() + + // Send the request + if _, err := sszNetworkEncoder.EncodeWithMaxLength(stream, WrapSSZObject(r.Payload)); err != nil { + return fmt.Errorf("failed to encode request: %w", err) + } + + // Wait for the response + buf := make([]byte, 1) + if _, err := io.ReadFull(stream, buf); err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + + // Code 0x00 is a success + // Anything else is an error + if buf[0] != 0 { + return fmt.Errorf("received invalid response") + } + + // Read the response + response := WrapSSZObject(r.Payload) + if err := sszNetworkEncoder.DecodeWithMaxLength(stream, response); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + // Close the stream + if err := stream.CloseWrite(); err != nil { + return fmt.Errorf("failed to close write stream: %w", err) + } + + return nil +} + +func (c *Client) RequestStatusFromPeer(ctx context.Context, peerID peer.ID) (*common.Status, error) { + status := c.GetStatus() + + req := &Request{ + ProtocolID: StatusProtocolID, + PeerID: peerID, + Payload: &status, + Timeout: time.Second * 30, + } + + rsp := &common.Status{} + + if err := c.sendRequest(ctx, req, rsp); err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + return rsp, nil +} + +func (c *Client) RequestMetadataFromPeer(ctx context.Context, peerID peer.ID) (*common.MetaData, error) { + metadata := c.metadata + + req := &Request{ + ProtocolID: MetaDataProtocolID, + PeerID: peerID, + Payload: metadata, + Timeout: time.Second * 30, + } + + rsp := &common.MetaData{} + + if err := c.sendRequest(ctx, req, rsp); err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + return rsp, nil +} diff --git a/pkg/consensus/mimicry/rpc_handlers.go b/pkg/consensus/mimicry/rpc_handlers.go new file mode 100644 index 0000000..9dc2deb --- /dev/null +++ b/pkg/consensus/mimicry/rpc_handlers.go @@ -0,0 +1,224 @@ +package mimicry + +import ( + "fmt" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/sirupsen/logrus" +) + +func (c *Client) registerHandlers() error { + if err := c.registerHandler(StatusProtocolID, c.handleStatus); err != nil { + return fmt.Errorf("failed to register status handler: %w", err) + } + + if err := c.registerHandler(GoodbyeProtocolID, c.handleGoodbye); err != nil { + return fmt.Errorf("failed to register goodbye handler: %w", err) + } + + if err := c.registerHandler(PingProtocolID, c.handlePing); err != nil { + return fmt.Errorf("failed to register ping handler: %w", err) + } + + return nil +} + +func (c *Client) registerHandler(proto protocol.ID, handler func(stream network.Stream)) error { + c.log.WithField("protocol", proto).Debug("Registering protocol handler") + + c.host.SetStreamHandler(proto, handler) + + return nil +} + +func (c *Client) handleStatus(stream network.Stream) { + start := time.Now() + defer func() { + c.log.WithField("duration", time.Since(start)).Debug("Handled status message") + + if err := stream.Close(); err != nil { + c.log.WithError(err).Error("Failed to close stream") + } + }() + + logCtx := c.log.WithFields(logrus.Fields{ + "peer": stream.Conn().RemotePeer().String(), + "protocol": stream.Protocol(), + }) + + var theirStatus common.Status + if err := sszNetworkEncoder.DecodeWithMaxLength(stream, WrapSSZObject(&theirStatus)); err != nil { + logCtx.WithError(err).Error("Failed to decode status message") + + return + } + + logCtx.WithFields(logrus.Fields{ + "fork_version": theirStatus.ForkDigest, + "finalized_epoch": theirStatus.FinalizedEpoch, + "finalized_root": theirStatus.FinalizedRoot, + "head_slot": theirStatus.HeadSlot, + "head_root": theirStatus.HeadRoot, + }).Debug("Received status message") + + // Write our response + if _, err := stream.Write([]byte{0x00}); err != nil { + logCtx.WithError(err).Error("Failed to write response to status message") + + return + } + + status := c.GetStatus() + + // Write our status + if _, err := sszNetworkEncoder.EncodeWithMaxLength(stream, WrapSSZObject(&status)); err != nil { + logCtx.WithError(err).Error("Failed to write status to peer") + + return + } + + if err := stream.CloseWrite(); err != nil { + logCtx.WithError(err).Error("Failed to close write stream") + + return + } +} + +func (c *Client) handleGoodbye(stream network.Stream) { + start := time.Now() + defer func() { + c.log.WithField("duration", time.Since(start)).Debug("Handled goodbye message") + + if err := stream.Close(); err != nil { + c.log.WithError(err).Error("Failed to close stream") + } + }() + + logCtx := c.log.WithFields(logrus.Fields{ + "peer": stream.Conn().RemotePeer().String(), + "protocol": stream.Protocol(), + }) + + if _, err := stream.Write([]byte{0x00}); err != nil { + logCtx.WithError(err).Error("Failed to write response to goodbye") + + return + } + + var theirGoodbye common.Goodbye + if err := sszNetworkEncoder.DecodeWithMaxLength(stream, WrapSSZObject(&theirGoodbye)); err != nil { + logCtx.WithError(err).Error("Failed to decode goodbye message") + + return + } + + logCtx.WithFields(logrus.Fields{ + "goodbye": theirGoodbye, + }).Debug("Received goodbye message") + + var resp common.Goodbye + if _, err := sszNetworkEncoder.EncodeWithMaxLength(stream, WrapSSZObject(&resp)); err != nil { + logCtx.WithError(err).Error("Failed to write goodbye to peer") + + return + } + + if err := stream.CloseWrite(); err != nil { + logCtx.WithError(err).Error("Failed to close write stream") + } +} + +func (c *Client) handlePing(stream network.Stream) { + start := time.Now() + defer func() { + c.log.WithField("duration", time.Since(start)).Debug("Handled ping message") + + if err := stream.Close(); err != nil { + c.log.WithError(err).Error("Failed to close stream") + } + }() + + logCtx := c.log.WithFields(logrus.Fields{ + "peer": stream.Conn().RemotePeer().String(), + "protocol": stream.Protocol(), + }) + + var theirPing common.Ping + if err := sszNetworkEncoder.DecodeWithMaxLength(stream, WrapSSZObject(&theirPing)); err != nil { + logCtx.WithError(err).Error("Failed to decode ping message") + + return + } + + logCtx.WithFields(logrus.Fields{ + "ping": fmt.Sprintf("%d", theirPing), + }).Debug("Received ping message") + + ping := common.Ping(c.metadata.SeqNumber) + + // Send the response + if _, err := stream.Write([]byte{0x00}); err != nil { + logCtx.WithError(err).Error("Failed to write response to ping") + + return + } + + // Write the ping + if _, err := sszNetworkEncoder.EncodeWithMaxLength(stream, WrapSSZObject(&ping)); err != nil { + logCtx.WithError(err).Error("Failed to write ping to peer") + + return + } + + if err := stream.CloseWrite(); err != nil { + logCtx.WithError(err).Error("Failed to close write stream") + } +} + +func (c *Client) handleMetadata(stream network.Stream) { + start := time.Now() + defer func() { + c.log.WithField("duration", time.Since(start)).Debug("Handled metadata message") + + if err := stream.Close(); err != nil { + c.log.WithError(err).Error("Failed to close stream") + } + }() + + logCtx := c.log.WithFields(logrus.Fields{ + "peer": stream.Conn().RemotePeer().String(), + "protocol": stream.Protocol(), + }) + + var theirMetadata common.MetaData + if err := sszNetworkEncoder.DecodeWithMaxLength(stream, WrapSSZObject(&theirMetadata)); err != nil { + logCtx.WithError(err).Error("Failed to decode metadata message") + + return + } + + logCtx.WithFields(logrus.Fields{ + "metadata": theirMetadata, + }).Debug("Received metadata message") + + // Write the response + if _, err := stream.Write([]byte{0x00}); err != nil { + logCtx.WithError(err).Error("Failed to write response to metadata") + + return + } + + resp := c.metadata + if _, err := sszNetworkEncoder.EncodeWithMaxLength(stream, WrapSSZObject(resp)); err != nil { + logCtx.WithError(err).Error("Failed to write metadata to peer") + + return + } + + if err := stream.CloseWrite(); err != nil { + logCtx.WithError(err).Error("Failed to close write stream") + } +} diff --git a/pkg/discovery/disc_v5.go b/pkg/discovery/disc_v5.go new file mode 100644 index 0000000..6fb5414 --- /dev/null +++ b/pkg/discovery/disc_v5.go @@ -0,0 +1,327 @@ +package discovery + +import ( + "context" + "crypto/ecdsa" + "net" + "sync" + "time" + + "github.com/chuckpreslar/emission" + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/go-co-op/gocron" + "github.com/sirupsen/logrus" +) + +const ( + topicNodeRecord = "node_record" +) + +type DiscV5 struct { + log logrus.FieldLogger + restart time.Duration + bootNodes []*enode.Node + + listener *ListenerV5 + + privKey *ecdsa.PrivateKey + + broker *emission.Emitter + + mu sync.Mutex + + scheduler *gocron.Scheduler + + started bool +} + +type ListenerV5 struct { + conn *net.UDPConn + localNode *enode.LocalNode + discovery *discover.UDPv5 + + mu sync.Mutex +} + +func NewDiscV5(ctx context.Context, restart time.Duration, log logrus.FieldLogger) *DiscV5 { + return &DiscV5{ + log: log.WithField("module", "ethcore/discovery/p2p/discV5"), + restart: restart, + broker: emission.NewEmitter(), + started: false, + } +} + +func (d *DiscV5) Start(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.started { + return nil + } + + d.started = true + + if err := d.startCrons(ctx); err != nil { + return err + } + + return nil +} + +func (d *DiscV5) startListener(ctx context.Context) error { + if d.listener != nil { + d.listener.Close() + } + + privKey, err := gcrypto.GenerateKey() + if err != nil { + return err + } + + d.privKey = privKey + + listener, err := d.startDiscovery(ctx, d.privKey) + if err != nil { + return err + } + + d.listener = listener + + go d.listenForNewNodes(ctx) + + return nil +} + +func (d *DiscV5) Stop(ctx context.Context) error { + if d.listener != nil { + d.listener.Close() + } + + if d.scheduler != nil { + d.scheduler.Stop() + } + + d.mu.Lock() + defer d.mu.Unlock() + + d.started = false + + return nil +} + +func (d *DiscV5) startCrons(ctx context.Context) error { + c := gocron.NewScheduler(time.Local) + + if _, err := c.Every(d.restart).Do(func() { + if err := d.startListener(ctx); err != nil { + d.log.WithError(err).Error("Failed to restart new node discovery") + } + }); err != nil { + return err + } + + c.StartAsync() + + d.scheduler = c + + return nil +} + +func (d *DiscV5) listenForNewNodes(ctx context.Context) { + iterator := d.listener.discovery.RandomNodes() + iterator = enode.Filter(iterator, d.filterPeer) + + defer iterator.Close() + + for { + exists := iterator.Next() + if !exists { + break + } + + node := iterator.Node() + + d.publishNodeRecord(ctx, node) + } +} + +func (d *DiscV5) createListener( + ctx context.Context, + privKey *ecdsa.PrivateKey, +) (*ListenerV5, error) { + listener := &ListenerV5{} + + var bindIP net.IP + + ipAddr := net.IPv4zero + + bindIP = ipAddr + udpAddr := &net.UDPAddr{ + IP: bindIP, + Port: int(0), + } + + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, err + } + + listener.conn = conn + + localNode, err := d.createLocalNode( + ctx, + privKey, + ipAddr, + int(0), + int(0), + ) + if err != nil { + return nil, err + } + + listener.localNode = localNode + + dv5Cfg := discover.Config{ + PrivateKey: privKey, + } + + dv5Cfg.Bootnodes = []*enode.Node{} + + d.mu.Lock() + defer d.mu.Unlock() + + dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, d.bootNodes...) + + discovery, err := discover.ListenV5(conn, localNode, dv5Cfg) + if err != nil { + return nil, err + } + + listener.discovery = discovery + + return listener, nil +} + +func (d *DiscV5) createLocalNode( + ctx context.Context, + privKey *ecdsa.PrivateKey, + ipAddr net.IP, + udpPort, tcpPort int, +) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + + localNode := enode.NewLocalNode(db, privKey) + + ipEntry := enr.IP(ipAddr) + udpEntry := enr.UDP(udpPort) + tcpEntry := enr.TCP(tcpPort) + + localNode.Set(ipEntry) + localNode.Set(udpEntry) + localNode.Set(tcpEntry) + localNode.SetFallbackIP(ipAddr) + localNode.SetFallbackUDP(udpPort) + + return localNode, nil +} + +func (d *DiscV5) startDiscovery( + ctx context.Context, + privKey *ecdsa.PrivateKey, +) (*ListenerV5, error) { + listener, err := d.createListener(ctx, privKey) + if err != nil { + return nil, err + } + + record := listener.discovery.Self() + d.log.WithField("ENR", record.String()).Info("Started discovery v5") + + return listener, nil +} + +func (d *DiscV5) filterPeer(node *enode.Node) bool { + // Ignore nil node entries passed in. + if node == nil { + return false + } + + // ignore nodes with no ip address stored. + if node.IP() == nil { + return false + } + + // do not dial nodes with their tcp ports not set + if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil { + if !enr.IsNotFound(err) { + d.log.WithError(err).Debug("Could not retrieve tcp port") + } + + return false + } + + return true +} + +func (d *DiscV5) publishNodeRecord(ctx context.Context, record *enode.Node) { + d.broker.Emit(topicNodeRecord, record) +} + +func (d *DiscV5) handleSubscriberError(err error, topic string) { + if err != nil { + d.log.WithError(err).WithField("topic", topic).Error("Subscriber error") + } +} + +func (d *DiscV5) UpdateBootNodes(bootNodes []string) error { + d.mu.Lock() + defer d.mu.Unlock() + + bn := []*enode.Node{} + + for _, addr := range bootNodes { + bootNode, parseErr := enode.Parse(enode.ValidSchemes, addr) + if parseErr != nil { + return parseErr + } + + bn = append(bn, bootNode) + } + + d.bootNodes = bn + + return nil +} + +func (d *DiscV5) OnNodeRecord(ctx context.Context, handler func(ctx context.Context, reason *enode.Node) error) { + d.broker.On(topicNodeRecord, func(reason *enode.Node) { + d.handleSubscriberError(handler(ctx, reason), topicNodeRecord) + }) +} + +func (l *ListenerV5) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + + if l.discovery != nil { + l.discovery.Close() + } + + if l.localNode != nil && l.localNode.Database() != nil { + l.localNode.Database().Close() + l.localNode = nil + } + + if l.conn != nil { + return l.conn.Close() + } + + return nil +}