Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MDNS peer discovery #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion cmd/internal/run/run.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
package run

import (
"context"
"fmt"
"log/slog"
"net/http"

"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
ma "github.com/multiformats/go-multiaddr"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
ww "github.com/wetware/go"
"github.com/wetware/go/util"
)

// DiscoveryNotifee implements the discovery.Notifee interface
// It will be triggered when new peers are found
type DiscoveryNotifee struct {
Ctx context.Context
Host host.Host
}

// HandlePeerFound adds the peer's address to the peerstore.
func (n DiscoveryNotifee) HandlePeerFound(info peer.AddrInfo) {
n.Host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
slog.Info("discovered peer", "peer", info.ID)
}

func Command() *cli.Command {
return &cli.Command{
Name: "run",
Expand All @@ -27,6 +47,12 @@ func Command() *cli.Command {
Name: "load",
EnvVars: []string{"WW_LOAD"},
},
&cli.StringFlag{
Name: "mdns",
EnvVars: []string{"WW_MDNS"},
Usage: "service tag name",
Value: "ww.local",
},
},
Before: setup(),
Action: run(),
Expand Down Expand Up @@ -55,10 +81,17 @@ func run() cli.ActionFunc {
return func(c *cli.Context) error {
h, err := libp2p.New()
if err != nil {
return err
return fmt.Errorf("host: %w", err)
}
defer h.Close()

// Create an mDNS service to discover peers on the local network
service := mdns.NewMdnsService(h, c.String("mdns"), &DiscoveryNotifee{Host: h})
if err := service.Start(); err != nil {
return fmt.Errorf("mdns: start: %w", err)
}
defer service.Close()

wetware := suture.New("ww", suture.Spec{
EventHook: util.EventHookWithContext(c.Context),
})
Expand Down
22 changes: 11 additions & 11 deletions system/ipfs.go → fs/ipfs/ipfs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package system
package ipfs

import (
"context"
Expand All @@ -14,17 +14,17 @@ import (
"github.com/pkg/errors"
)

var _ fs.FS = (*IPFS)(nil)
var _ fs.FS = (*UnixFS)(nil)

// An IPFS provides access to a hierarchical file system.
// An UnixFS provides access to a hierarchical file system.
//
// The IPFS interface is the minimum implementation required of the file system.
// The UnixFS interface is the minimum implementation required of the file system.
// A file system may implement additional interfaces,
// such as [ReadFileFS], to provide additional or optimized functionality.
//
// [testing/fstest.TestFS] may be used to test implementations of an IPFS for
// [testing/fstest.TestFS] may be used to test implementations of an UnixFS for
// correctness.
type IPFS struct {
type UnixFS struct {
Ctx context.Context
Root path.Path
Unix iface.UnixfsAPI
Expand All @@ -39,7 +39,7 @@ type IPFS struct {
// Open should reject attempts to open names that do not satisfy
// fs.ValidPath(name), returning a *fs.PathError with Err set to
// fs.ErrInvalid or fs.ErrNotExist.
func (f IPFS) Open(name string) (fs.File, error) {
func (f UnixFS) Open(name string) (fs.File, error) {
path, node, err := f.Resolve(f.Ctx, name)
if err != nil {
return nil, &fs.PathError{
Expand All @@ -55,7 +55,7 @@ func (f IPFS) Open(name string) (fs.File, error) {
}, nil
}

func (f IPFS) Resolve(ctx context.Context, name string) (path.Path, files.Node, error) {
func (f UnixFS) Resolve(ctx context.Context, name string) (path.Path, files.Node, error) {
if pathInvalid(name) {
return nil, nil, fs.ErrInvalid
}
Expand All @@ -73,16 +73,16 @@ func pathInvalid(name string) bool {
return !fs.ValidPath(name)
}

func (f IPFS) Sub(dir string) (fs.FS, error) {
func (f UnixFS) Sub(dir string) (fs.FS, error) {
var root path.Path
var err error
if (f == IPFS{}) {
if (f == UnixFS{}) {
root, err = path.NewPath(dir)
} else {
root, err = path.Join(f.Root, dir)
}

return &IPFS{
return &UnixFS{
Ctx: f.Ctx,
Root: root,
Unix: f.Unix,
Expand Down
16 changes: 8 additions & 8 deletions system/ipfs_test.go → fs/ipfs/ipfs_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package system_test
package ipfs_test

import (
"context"
Expand All @@ -9,7 +9,7 @@ import (
"github.com/ipfs/boxo/path"
"github.com/ipfs/kubo/client/rpc"
"github.com/stretchr/testify/require"
"github.com/wetware/go/system"
"github.com/wetware/go/fs/ipfs"
)

const IPFS_ROOT = "/ipfs/QmRecDLNaESeNY3oUFYZKK9ftdANBB8kuLaMdAXMD43yon" // go/system/testdata/fs
Expand All @@ -25,10 +25,10 @@ func TestIPFS_Env(t *testing.T) {
root, err := path.NewPath(IPFS_ROOT)
require.NoError(t, err)

ipfs, err := rpc.NewLocalApi()
api, err := rpc.NewLocalApi()
require.NoError(t, err)

dir, err := ipfs.Unixfs().Ls(context.Background(), root)
dir, err := api.Unixfs().Ls(context.Background(), root)
require.NoError(t, err)

var names []string
Expand All @@ -47,10 +47,10 @@ func TestIPFS_FS(t *testing.T) {
root, err := path.NewPath(IPFS_ROOT)
require.NoError(t, err)

ipfs, err := rpc.NewLocalApi()
api, err := rpc.NewLocalApi()
require.NoError(t, err)

fs := system.IPFS{Ctx: context.Background(), Unix: ipfs.Unixfs(), Root: root}
fs := ipfs.UnixFS{Ctx: context.Background(), Unix: api.Unixfs(), Root: root}
err = fstest.TestFS(fs,
"main.go",
"main.wasm",
Expand All @@ -67,10 +67,10 @@ func TestIPFS_SubFS(t *testing.T) {
root, err := path.NewPath("/ipfs/QmSAyttKvYkSCBTghuMxAJaBZC3jD2XLRCQ5FB3CTrb9rE") // go/system/testdata
require.NoError(t, err)

ipfs, err := rpc.NewLocalApi()
api, err := rpc.NewLocalApi()
require.NoError(t, err)

fs, err := fs.Sub(system.IPFS{Ctx: context.Background(), Unix: ipfs.Unixfs(), Root: root}, "fs")
fs, err := fs.Sub(ipfs.UnixFS{Ctx: context.Background(), Unix: api.Unixfs(), Root: root}, "fs")
require.NoError(t, err)
require.NotNil(t, fs)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ require (
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/libp2p/zeroconf/v2 v2.2.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.58 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
github.com/miekg/dns v1.1.58 h1:ca2Hdkz+cDg/7eNF6V56jjzuZ4aCAE+DbVkILdQWG/4=
github.com/miekg/dns v1.1.58/go.mod h1:Ypv+3b/KadlvW9vJfXOTf300O4UqaHFzFCuHz+rPkBY=
github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8=
Expand Down Expand Up @@ -741,6 +742,7 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
Expand Down Expand Up @@ -789,6 +791,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -806,6 +810,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
7 changes: 4 additions & 3 deletions ww.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/thejerf/suture/v4"
"github.com/wetware/go/fs/ipfs"
"github.com/wetware/go/system"
)

Expand Down Expand Up @@ -113,20 +114,20 @@ func (c Cluster) Serve(ctx context.Context) error {
}

// NewFS returns an fs.FS.
func (c Cluster) NewFS(ctx context.Context) (*system.IPFS, error) {
func (c Cluster) NewFS(ctx context.Context) (*ipfs.UnixFS, error) {
root, err := path.NewPath(c.NS)
if err != nil {
return nil, err
}

return &system.IPFS{
return &ipfs.UnixFS{
Ctx: ctx,
Unix: c.IPFS.Unixfs(),
Root: root,
}, nil
}

func (c Cluster) Compile(ctx context.Context, r wazero.Runtime, fs *system.IPFS) (wazero.CompiledModule, error) {
func (c Cluster) Compile(ctx context.Context, r wazero.Runtime, fs *ipfs.UnixFS) (wazero.CompiledModule, error) {
root, n, err := fs.Resolve(ctx, ".")
if err != nil {
return nil, err
Expand Down
Loading