Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiaddress not constructed on startup #524

Merged
merged 11 commits into from
Aug 24, 2024
8 changes: 5 additions & 3 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os/signal"
"syscall"

"github.com/multiformats/go-multiaddr"

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/workers"

Expand Down Expand Up @@ -133,10 +135,10 @@ func main() {
}()

// Get the multiaddress and IP address of the node
multiAddr := node.GetMultiAddrs().String() // Get the multiaddress
ipAddr := node.Host.Addrs()[0].String() // Get the IP address
multiAddr := node.GetMultiAddrs() // Get the multiaddress
ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr, ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)

<-ctx.Done()
}
182 changes: 128 additions & 54 deletions pkg/network/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@ package network

import (
"fmt"
"io"
"net"
"os"
"net/http"
"strings"

"github.com/chyeh/pubip"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)

// getOutboundIP returns the outbound IP address of the current machine 172.17.0.2 10.0.0.2 etc
func getOutboundIP() string {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
logrus.Warn("[-] Error getting outbound IP")
}
defer conn.Close()
localAddr := conn.LocalAddr().String()
idx := strings.LastIndex(localAddr, ":")
return localAddr[0:idx]
}
// The URL of the GCP metadata server for the external IP
const externalIPURL = "http://metadata.google.internal/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip"

// GetMultiAddressesForHost returns the multiaddresses for the host
func GetMultiAddressesForHost(host host.Host) ([]multiaddr.Multiaddr, error) {
Expand Down Expand Up @@ -56,82 +47,165 @@ func GetMultiAddressesForHostQuiet(host host.Host) []multiaddr.Multiaddr {
return ma
}

// GetPriorityAddress returns the best public or private IP address
func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr {
bestAddr := getBestPublicAddress(addrs)
if bestAddr != nil {
return bestAddr
// getPublicMultiAddress returns the best public IP address
func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering here, why we still need to have to get the public IP? aren't we dialing now with libp2p?

ipBytes, err := Get("https://api.ipify.org?format=text", nil)
externalIP := net.ParseIP(string(ipBytes))
if err != nil {
logrus.Warnf("[-] Failed to get public IP: %v", err)
return nil
}
if externalIP == nil || externalIP.IsPrivate() {
return nil
}

var addrToCopy multiaddr.Multiaddr
for _, addr := range addrs {
addrToCopy = addr
break
}
publicMultiaddr, err := replaceIPComponent(addrToCopy, externalIP.String())
if err != nil {
logrus.Warnf("[-] Failed to create multiaddr with public IP: %v", err)
return nil
}
return publicMultiaddr
}

// GetPriorityAddress returns the best public or private IP address
func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr {
var bestPrivateAddr multiaddr.Multiaddr
bestPublicAddr := getPublicMultiAddress(addrs)

for _, addr := range addrs {
ipComponent, err := addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
continue // Not an IP address
ipComponent, err = addr.ValueForProtocol(multiaddr.P_IP6)
if err != nil {
continue // Not an IP address
}
}

ip := net.ParseIP(ipComponent)
if ip == nil || ip.IsLoopback() {
continue // Skip invalid or loopback addresses
}

if ip.IsPrivate() {
// If it's the first private address found or if it's preferred over the current best, keep it
if bestPrivateAddr == nil || isPreferredAddress(addr) {
bestPrivateAddr = addr
}
} else {
// If it's the first public address found or if it's preferred over the current best, keep it
if bestPublicAddr == nil || isPreferredAddress(addr) {
bestPublicAddr = addr
}
}
}

if bestPrivateAddr != nil {
return bestPrivateAddr
var baseAddr multiaddr.Multiaddr
// Prefer public addresses over private ones
if bestPublicAddr != nil {
baseAddr = bestPublicAddr
} else if bestPrivateAddr != nil {
baseAddr = bestPrivateAddr
} else {
logrus.Warn("No address matches the priority criteria, returning the first entry")
baseAddr = addrs[0]
}

if len(addrs) > 0 {
logrus.Warn("[-] No suitable address found, returning the first entry")
return addrs[0]
logrus.Infof("Best public address: %s", bestPublicAddr)
logrus.Debugf("Best private address: %s", bestPrivateAddr)
logrus.Debugf("Base address: %s", baseAddr)
gcpAddr := replaceGCPAddress(baseAddr)
if gcpAddr != nil {
baseAddr = gcpAddr
}

return nil
return baseAddr
}

// getBestPublicAddress returns the best public IP address
func getBestPublicAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr {
var externalIP net.IP
func replaceGCPAddress(addr multiaddr.Multiaddr) multiaddr.Multiaddr {
// After finding the best address, try to get the GCP external IP
var err error

if os.Getenv("ENV") == "local" {
externalIP = net.ParseIP(getOutboundIP())
} else {
externalIP, err = pubip.Get()
var bestAddr multiaddr.Multiaddr
gotExternalIP, externalIP := getGCPExternalIP()
if gotExternalIP && externalIP != "" {
bestAddr, err = replaceIPComponent(addr, externalIP)
if err != nil {
logrus.Warnf("[-] Failed to get public IP: %v", err)
logrus.Warnf("Failed to replace IP component: %s", err)
return nil
}
}
logrus.Debug("Got external IP: ", gotExternalIP)
logrus.Debug("Address after replacing IP component: ", bestAddr)
return bestAddr
}

if externalIP == nil || externalIP.IsPrivate() {
return nil
func replaceIPComponent(maddr multiaddr.Multiaddr, newIP string) (multiaddr.Multiaddr, error) {
var components []multiaddr.Multiaddr
for _, component := range multiaddr.Split(maddr) {
if component.Protocols()[0].Code == multiaddr.P_IP4 || component.Protocols()[0].Code == multiaddr.P_IP6 {
// Create a new IP component
newIPComponent, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s", newIP))
if err != nil {
return nil, err
}
components = append(components, newIPComponent)
} else {
components = append(components, component)
}
}
return multiaddr.Join(components...), nil
}

//func contains(slice []multiaddr.Multiaddr, item multiaddr.Multiaddr) bool {
// for _, a := range slice {
// if a.Equal(item) {
// return true
// }
// }
// return false
//}

func getGCPExternalIP() (bool, string) {

// Create a new HTTP client with a specific timeout
client := &http.Client{}

// Create a new multiaddr with the public IP
publicAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s", externalIP.String()))
// Make a request to the metadata server
req, err := http.NewRequest("GET", externalIPURL, nil)
if err != nil {
logrus.Warnf("[-] Failed to create multiaddr with public IP: %v", err)
return nil
return false, ""
}

// Find a suitable port from existing addresses
for _, addr := range addrs {
if strings.HasPrefix(addr.String(), "/ip4/") {
port, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err == nil {
return publicAddr.Encapsulate(multiaddr.StringCast("/tcp/" + port))
}
}
// GCP metadata server requires this specific header
req.Header.Add("Metadata-Flavor", "Google")

// Perform the request
resp, err := client.Do(req)
if err != nil {
return false, ""
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
logrus.Error(err)
}
}(resp.Body)

return publicAddr
// Check if the metadata server returns a successful status code
if resp.StatusCode != http.StatusOK {
logrus.Debug("Metadata server response status: ", resp.StatusCode)
return false, ""
}
//Read the external IP from the response
body, err := io.ReadAll(resp.Body)
if err != nil {
return true, ""
}
// Check that the response is a valid IP address
if net.ParseIP(string(body)) == nil {
return false, ""
}
logrus.Debug("External IP from metadata server: ", string(body))
return true, string(body)
}

// isPreferredAddress checks if the multiaddress contains the UDP protocol
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package handlers
package network

import (
"bytes"
Expand Down
20 changes: 13 additions & 7 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"encoding/json"
"strings"
"sync"
"time"
Expand All @@ -11,9 +12,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
)

Expand All @@ -29,8 +30,8 @@ type dbValidator struct{}
func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) {
func WithDht(ctx context.Context, host host.Host, protocolId, prefix protocol.ID,
peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) {
options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
Expand All @@ -39,6 +40,11 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
options = append(options, dht.ProtocolPrefix(prefix))
options = append(options, dht.NamespacedValidator("db", dbValidator{}))

bootstrapNodes, err := GetBootNodesMultiAddress(config.GetInstance().Bootnodes)
if err != nil {
return nil, err
}

kademliaDHT, err := dht.New(ctx, host, options...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -122,13 +128,13 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
}
}(stream) // Close the stream when done

multiaddr, err := GetMultiAddressesForHost(host)
// Convert NodeData to JSON
jsonData, err := json.Marshal(nodeData)
if err != nil {
logrus.Errorf("[-] Error getting multiaddresses for host: %s", err)
logrus.Error("[-] Error marshalling NodeData:", err)
return
}
multaddrString := GetPriorityAddress(multiaddr)
_, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String()))
_, err = stream.Write(jsonData)
if err != nil {
logrus.Errorf("[-] Error writing to stream: %s", err)
return
Expand Down
20 changes: 4 additions & 16 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {
func (node *OracleNode) Start() (err error) {
logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String())

bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(config.GetInstance().Bootnodes)
if err != nil {
return err
}

node.Host.SetStreamHandler(node.Protocol, node.handleStream)
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData)

Expand All @@ -192,7 +187,9 @@ func (node *OracleNode) Start() (err error) {
go node.ListenToNodeTracker()
go node.handleDiscoveredPeers()

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked)
myNodeData := pubsub2.GetSelfNodeData(node.Host, node.IsStaked, node.priorityAddrs)

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, node.Protocol, config.MasaPrefix, node.PeerChan, myNodeData)
if err != nil {
return err
}
Expand All @@ -205,18 +202,9 @@ func (node *OracleNode) Start() (err error) {

nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String())
if nodeData == nil {
publicKeyHex := masacrypto.KeyManagerInstance().EthAddress
ma := myNetwork.GetMultiAddressesForHostQuiet(node.Host)
nodeData = pubsub2.NewNodeData(ma[0], node.Host.ID(), publicKeyHex, pubsub2.ActivityJoined)
nodeData.IsStaked = node.IsStaked
nodeData = myNodeData
nodeData.SelfIdentified = true
nodeData.IsDiscordScraper = node.IsDiscordScraper
nodeData.IsTelegramScraper = node.IsTelegramScraper
nodeData.IsTwitterScraper = node.IsTwitterScraper
nodeData.IsWebScraper = node.IsWebScraper
nodeData.IsValidator = node.IsValidator
}

nodeData.Joined()
node.NodeTracker.HandleNodeData(*nodeData)

Expand Down
Loading
Loading