Skip to content

Commit

Permalink
🚑 Fix "resource limit reached" bug when trying to connect to a remote…
Browse files Browse the repository at this point in the history
… node
  • Loading branch information
smolgroot committed May 28, 2024
1 parent 4de83dd commit 2fa889c
Showing 1 changed file with 90 additions and 21 deletions.
111 changes: 90 additions & 21 deletions pkg/vpn/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vpn

import (
"context"
"fmt"
"log"
"runtime"
"time"
Expand All @@ -18,8 +19,8 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
peerstore "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
Expand Down Expand Up @@ -77,19 +78,19 @@ func GetLocalPeerDetails(node host.Host) gin.HandlerFunc {
return gin.HandlerFunc(fn)
}

// PingPeer godoc
// @Summary Ping a remote host (using Libp2p Ping Protocol)
// @Description Find the addresses from a multiaddr, connect to the peer and share a ping
// TestConnectivity godoc
// @Summary Test the connectivity of a remote host (using Libp2p Connect)
// @Description Find the addresses from a multiaddr and try to connect to the peer
// @Tags VPN
// @Produce json
// @Param peerId path string true "Peer ID"
// @Router /ping/{peerId} [get]
func PingPeer(node host.Host, dht *dht.IpfsDHT) gin.HandlerFunc {
func TestConnectivity(node host.Host, dht *dht.IpfsDHT) gin.HandlerFunc {
fn := func(c *gin.Context) {
pid := c.Param("peerId")
var addrString string
// DEBUG
if pid == "16Uiu2HAmFiLZXDswaYP6ptkQN13QgTsJEsDvW9x8aEY7gKUKaAjt" {
if pid == "16Uiu2HAmFiLZXDswaYP6ptkQN13QgTsJEsDvW9x8aEY7gKUKaAjt___" {
// Our test node multiaddr hardocoded for tests
addrString = "/ip4/136.244.105.166/tcp/4002/p2p/16Uiu2HAmFiLZXDswaYP6ptkQN13QgTsJEsDvW9x8aEY7gKUKaAjt"
} else {
Expand All @@ -114,11 +115,6 @@ func PingPeer(node host.Host, dht *dht.IpfsDHT) gin.HandlerFunc {
return
}
log.Println("Connected to the remote peer", dstPeer.ID)
// if err := dht.Ping(c, dstPeer.ID); err != nil {
// log.Println(err)
// c.IndentedJSON(200, err)
// return
// }
type Result struct {
Res string `json:"result"`
}
Expand All @@ -128,6 +124,68 @@ func PingPeer(node host.Host, dht *dht.IpfsDHT) gin.HandlerFunc {
return gin.HandlerFunc(fn)
}

// Connect godoc
// @Summary Connect to a remote libp2p peer and enable new Stream
// @Description Connect to a remote libp2p peer and enable new Stream
// @Tags VPN
// @Produce json
// @Param peerId path string true "Peer ID"
// @Router /connect/{peerId} [get]
func Connect(node host.Host, dht *dht.IpfsDHT) gin.HandlerFunc {
fn := func(c *gin.Context) {
pid := c.Param("peerId")
var addrString string
// DEBUG
if pid == "16Uiu2HAmFiLZXDswaYP6ptkQN13QgTsJEsDvW9x8aEY7gKUKaAjt___" {
// Our test node multiaddr hardocoded for tests
addrString = "/ip4/136.244.105.166/tcp/4002/p2p/16Uiu2HAmFiLZXDswaYP6ptkQN13QgTsJEsDvW9x8aEY7gKUKaAjt"
} else {
addrString = "/p2p/" + c.Param("peerId")
}
addr, err := multiaddr.NewMultiaddr(addrString)
if err != nil {
log.Println(err)
c.IndentedJSON(200, err.Error())
return
}
dstPeer, err := peerstore.AddrInfoFromP2pAddr(addr)
if err != nil {
log.Println(err)
c.IndentedJSON(200, err)
return
}
log.Println(dstPeer)
if err := node.Connect(c, *dstPeer); err != nil {
log.Println(err)
c.IndentedJSON(200, err)
return
}
log.Println("Connected to the remote peer", dstPeer.ID)

// we can open a new stream
s, err := node.NewStream(
context.Background(),
dstPeer.ID,
"/skypier/1.0",
)
if err != nil {
log.Println(err)
}
n, err := s.Write([]byte("Hello World!"))
if err != nil {
log.Println(err)
}
res := fmt.Sprintf("Created a stream to the remote node %v, and sent %d bytes.", dstPeer.ID, n)
type Result struct {
Res string `json:"result"`
}
r := &Result{Res: res}
c.IndentedJSON(200, r)

}
return gin.HandlerFunc(fn)
}

func displayNodeInfo(ctx context.Context, node host.Host, dht *dht.IpfsDHT) {
// print node ID
log.Println("───────────────────────────────────────────────────")
Expand Down Expand Up @@ -174,10 +232,16 @@ func StartNode(innerConfig utils.InnerConfig, pk crypto.PrivKey, tcpPort string,

// Connection manager - Load Balancer
connmgr, err := connmgr.NewConnManager(
100, // Lowwater
400, // HighWater,
100, // Lowwater
8000, // HighWater,
connmgr.WithGracePeriod(time.Minute),
)
utils.Check(err)

// Sometimes the swarm_stream is left open, but the underlying yamux_stream is closed.
// This causes the resource limit to be reached. We Need to add monitoring and force to close old streams
resourceManager, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
utils.Check(err)

var idht *dht.IpfsDHT

Expand Down Expand Up @@ -224,6 +288,8 @@ func StartNode(innerConfig utils.InnerConfig, pk crypto.PrivKey, tcpPort string,
idht, err = dht.New(ctx, h)
return idht, err
}),
// add monitoring and force to close old streams
libp2p.ResourceManager(resourceManager),
libp2p.FallbackDefaults,
libp2p.Ping(true),
)
Expand Down Expand Up @@ -255,12 +321,13 @@ func StartNode(innerConfig utils.InnerConfig, pk crypto.PrivKey, tcpPort string,
// TODO add more bootstrap nodes for Skypier in other countries to avoid single point of failure
// TODO add some bootstrap nodes with TCP && QUIC
// TODO avoid having default bootstrap nodes hardcoded here. could be get from an online URI, easier for future update
// ipfsPublicPeer, err := multiaddr.NewMultiaddr("/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb")

ipfsPublicPeer, err := multiaddr.NewMultiaddr("/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb")
skypierPublicPeer, err := multiaddr.NewMultiaddr("/ip4/136.244.105.166/udp/4001/quic-v1/p2p/12D3KooWKzmZmLySs5WKBvdxzsctWNsN9abbtnj4PyyqNg9LCyek")
utils.Check(err)
skypierBootstrapPeers := [...]multiaddr.Multiaddr{
skypierPublicPeer,
// ipfsPublicPeer,
ipfsPublicPeer,
}

// This connects to public bootstrappers
Expand All @@ -274,8 +341,10 @@ func StartNode(innerConfig utils.InnerConfig, pk crypto.PrivKey, tcpPort string,
log.Println("Connected to bootstrap peer: ", pi.ID)
}

log.Println("Enabling Stream Handler...")
// Set the Skypier protocol handler on the Host's Mux
node.SetStreamHandler(protocol.ID(innerConfig.Protocol), streamHandler)
node.SetStreamHandler("/skypier/1.0", streamHandler)
log.Println("Stream handler enabled for protocol /skypier/1.0")

return node, idht, err
}
Expand Down Expand Up @@ -318,16 +387,16 @@ func SetNodeUp(ctx context.Context, config utils.InnerConfig) (host.Host, *dht.I
return node, dht
}

var RevLookup map[string]string
// var RevLookup map[string]string

func streamHandler(stream network.Stream) {
log.Println("Entered the stream handler...")

// If the remote node ID isn't in the list of known nodes don't respond.
if _, ok := RevLookup[stream.Conn().RemotePeer().ShortString()]; !ok {
stream.Reset()
return
}
// if _, ok := RevLookup[stream.Conn().RemotePeer().ShortString()]; !ok {
// stream.Reset()
// return
// }
var packet = make([]byte, 1500)
var packetSize = make([]byte, 2)
for {
Expand Down

0 comments on commit 2fa889c

Please sign in to comment.