Skip to content

Commit

Permalink
Fix stream handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Nov 16, 2024
1 parent 3db3915 commit 82a181a
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 58 deletions.
53 changes: 34 additions & 19 deletions cmd/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log/slog"
"net/http"
"os"

"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
Expand Down Expand Up @@ -43,14 +44,9 @@ func Command() *cli.Command {
EnvVars: []string{"WW_ENV"},
},
&cli.BoolFlag{
Name: "debug",
EnvVars: []string{"WW_DEBUG"},
Usage: "enable WASM debug values",
Name: "stdin",
Usage: "bind stdin to wasm guest",
},
// &cli.BoolFlag{
// Name: "listen",
// Usage: "serve network calls after main() exits",
// },
},
Action: run(),
}
Expand Down Expand Up @@ -95,8 +91,8 @@ func run() cli.ActionFunc {
Args: c.Args().Slice(),
Env: c.StringSlice("env"),
Stdin: stdin(c),
Stdout: c.App.Writer,
Stderr: c.App.ErrWriter,
Stdout: stdout(c),
Stderr: stderr(c),
},
Net: system.Net{
Host: h,
Expand All @@ -110,16 +106,6 @@ func run() cli.ActionFunc {
}
}

func stdin(c *cli.Context) io.Reader {
if c.IsSet("stdin") && c.Bool("stdin") {
return c.App.Reader
}

return emptyReader
}

var emptyReader io.Reader = &bytes.Reader{}

func handler(c *cli.Context, h host.Host) system.HandlerFunc {
return func(ctx context.Context, p *proc.P) error {
sub, err := h.EventBus().Subscribe([]any{
Expand Down Expand Up @@ -182,3 +168,32 @@ func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) {

return
}

func stdin(c *cli.Context) io.Reader {
switch r := c.App.Reader.(type) {
case *os.File:
info, err := r.Stat()
if err != nil {
panic(err)
}

if info.Size() <= 0 {
break
}

return &io.LimitedReader{
R: c.App.Reader,
N: 1<<32 - 1, // max u32
}
}

return &bytes.Reader{} // empty buffer
}

func stdout(c *cli.Context) io.Writer {
return c.App.Writer
}

func stderr(c *cli.Context) io.Writer {
return c.App.ErrWriter
}
34 changes: 30 additions & 4 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package main

import (
"bufio"
"flag"
"io"
"log/slog"
"os"

"github.com/wetware/go/std/system"
Expand All @@ -18,15 +20,30 @@ func echo() {
}

func main() {
stdin := flag.Bool("stdin", false, "read from standard input")
serve := flag.Bool("serve", false, "handle async method calls")
stdin := flag.Bool("stdin", false, "read data from stdin")
flag.Parse()

if *stdin {
echo()
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
slog.Error("failed echo stdin",
"reason", err)
os.Exit(1)
}

} else {
w := bufio.NewWriter(os.Stdout)
for _, arg := range os.Args[1:] {
w.WriteString(arg)
w.WriteString(" ")
if err := w.Flush(); err != nil {
slog.Error("failed to flush argument to stdout",
"reason", err)
os.Exit(1)
}
}
}

if *serve {
if serve() {
// Yield control to the scheduler.
os.Exit(system.StatusAwaiting)
// The caller will intercept interface{ExitCode() uint32} and
Expand All @@ -40,3 +57,12 @@ func main() {
// Caller will resolve to err = nil.
// Top-level CLI command will unblock.
}

func serve() bool {
switch os.Getenv("WW_SERVE") {
case "", "false", "0":
return false
default:
return true
}
}
Binary file modified examples/echo/main.wasm
Binary file not shown.
38 changes: 25 additions & 13 deletions proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,59 @@ import (
"crypto/rand"
"errors"
"io"
"log/slog"
"runtime"
"strings"

"capnproto.org/go/capnp/v3"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
)

type Config struct {
type Command struct {
Args, Env []string
Stdin io.Reader
Stdout, Stderr io.Writer
}

func (cfg Config) Instantiate(
ctx context.Context,
r wazero.Runtime,
cm wazero.CompiledModule,
) (*P, error) {
func (cmd Command) Instantiate(ctx context.Context, r wazero.Runtime, cm wazero.CompiledModule) (*P, error) {
var p P
var err error

// /ww/<semver>/proc/<pid>
pid := NewPID().String()

var err error
p.Mod, err = r.InstantiateModule(ctx, cm, wazero.NewModuleConfig().
config := cmd.WithEnv(wazero.NewModuleConfig().
WithName(pid).
WithArgs(cfg.Args...).
WithArgs(cmd.Args...).
WithStdin(&p.mailbox).
WithStdout(cfg.Stdout).
WithStderr(cfg.Stderr).
WithStdout(cmd.Stdout).
WithStderr(cmd.Stderr).
WithEnv("WW_PID", pid).
WithRandSource(rand.Reader).
WithOsyield(runtime.Gosched).
WithSysNanosleep().
WithSysNanotime().
WithSysWalltime().
WithStartFunctions())
p.Mod, err = r.InstantiateModule(ctx, cm, config)
return &p, err
}

func (cfg Command) WithEnv(mc wazero.ModuleConfig) wazero.ModuleConfig {
for _, s := range cfg.Env {
ss := strings.SplitN(s, "=", 2)
if len(ss) != 2 {
slog.Warn("ignored unparsable environment variable",
"var", s)
continue
}

mc = mc.WithEnv(ss[0], ss[1])
}

return mc
}

type P struct {
Parent protocol.ID
Mod api.Module
Expand Down
4 changes: 2 additions & 2 deletions proc/proc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestProc_echo(t *testing.T) {
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)

p, err := proc.Config{
p, err := proc.Command{
Stdout: stdout,
Stderr: stderr,
}.Instantiate(ctx, r, cm)
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestProc_echo_repeated_calls(t *testing.T) {
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)

p, err := proc.Config{
p, err := proc.Command{
Stdout: stdout,
Stderr: stderr,
}.Instantiate(ctx, r, cm)
Expand Down
18 changes: 16 additions & 2 deletions system/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package system

import (
"context"
"log/slog"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -20,12 +21,25 @@ func (n Net) Bind(ctx context.Context, p *proc.P) ReleaseFunc {
handler := proc.StreamHandler{
VersionedID: n.Proto,
Proc: p}
proto := handler.Proto()
pid := handler.String()
peer := n.Host.ID()

n.Host.SetStreamHandlerMatch(
handler.Proto(),
proto,
handler.Match,
handler.Bind(ctx))
return func() { n.Host.RemoveStreamHandler(handler.Proto()) }
slog.DebugContext(ctx, "attached process stream handlers",
"peer", peer,
"proto", proto,
"proc", pid)
return func() {
n.Host.RemoveStreamHandler(proto)
slog.DebugContext(ctx, "detached process stream handlers",
"peer", peer,
"proto", proto,
"proc", pid)
}
}

// StorePeer is a peer handler that inserts the peer in the
Expand Down
22 changes: 4 additions & 18 deletions ww.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func (env Env) Bind(ctx context.Context, r wazero.Runtime) error {
return err
} else if err := call.SetName("_start"); err != nil {
return err
} else if b, err := io.ReadAll(env.IO.Stdin); err != nil {
}

if b, err := io.ReadAll(env.IO.Stdin); err != nil {
return err
} else if err := call.SetCallData(b); err != nil {
return err
Expand All @@ -84,10 +86,9 @@ func (env Env) Instantiate(ctx context.Context, r wazero.Runtime) (*proc.P, erro
}
defer cm.Close(ctx)

return proc.Config{
return proc.Command{
Args: env.IO.Args,
Env: env.IO.Env,
Stdin: env.IO.Stdin,
Stdout: env.IO.Stdout,
Stderr: env.IO.Stderr,
}.Instantiate(ctx, r, cm)
Expand All @@ -111,18 +112,3 @@ func (env Env) ReadAll(ctx context.Context, name string) ([]byte, error) {

return io.ReadAll(f)
}

// func (env Env) WithEnv(mc wazero.ModuleConfig) wazero.ModuleConfig {
// for _, s := range env.IO.Env {
// ss := strings.SplitN(s, "=", 2)
// if len(ss) != 2 {
// slog.Warn("ignored unparsable environment variable",
// "var", s)
// continue
// }

// mc = mc.WithEnv(ss[0], ss[1])
// }

// return mc
// }

0 comments on commit 82a181a

Please sign in to comment.