diff --git a/cmd/internal/run/run.go b/cmd/internal/run/run.go index 3393b64..b772e4a 100644 --- a/cmd/internal/run/run.go +++ b/cmd/internal/run/run.go @@ -6,6 +6,7 @@ import ( "io" "log/slog" "net/http" + "os" "github.com/ipfs/kubo/client/rpc" iface "github.com/ipfs/kubo/core/coreiface" @@ -43,14 +44,9 @@ func Command() *cli.Command { EnvVars: []string{"WW_ENV"}, }, &cli.BoolFlag{ - Name: "debug", - EnvVars: []string{"WW_DEBUG"}, - Usage: "enable WASM debug values", + Name: "stdin", + Usage: "bind stdin to wasm guest", }, - // &cli.BoolFlag{ - // Name: "listen", - // Usage: "serve network calls after main() exits", - // }, }, Action: run(), } @@ -95,8 +91,8 @@ func run() cli.ActionFunc { Args: c.Args().Slice(), Env: c.StringSlice("env"), Stdin: stdin(c), - Stdout: c.App.Writer, - Stderr: c.App.ErrWriter, + Stdout: stdout(c), + Stderr: stderr(c), }, Net: system.Net{ Host: h, @@ -110,16 +106,6 @@ func run() cli.ActionFunc { } } -func stdin(c *cli.Context) io.Reader { - if c.IsSet("stdin") && c.Bool("stdin") { - return c.App.Reader - } - - return emptyReader -} - -var emptyReader io.Reader = &bytes.Reader{} - func handler(c *cli.Context, h host.Host) system.HandlerFunc { return func(ctx context.Context, p *proc.P) error { sub, err := h.EventBus().Subscribe([]any{ @@ -182,3 +168,32 @@ func newIPFSClient(c *cli.Context) (ipfs iface.CoreAPI, err error) { return } + +func stdin(c *cli.Context) io.Reader { + switch r := c.App.Reader.(type) { + case *os.File: + info, err := r.Stat() + if err != nil { + panic(err) + } + + if info.Size() <= 0 { + break + } + + return &io.LimitedReader{ + R: c.App.Reader, + N: 1<<32 - 1, // max u32 + } + } + + return &bytes.Reader{} // empty buffer +} + +func stdout(c *cli.Context) io.Writer { + return c.App.Writer +} + +func stderr(c *cli.Context) io.Writer { + return c.App.ErrWriter +} diff --git a/examples/echo/main.go b/examples/echo/main.go index d61d429..c296da0 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -3,8 +3,10 @@ package main import ( + "bufio" "flag" "io" + "log/slog" "os" "github.com/wetware/go/std/system" @@ -18,15 +20,30 @@ func echo() { } func main() { - stdin := flag.Bool("stdin", false, "read from standard input") - serve := flag.Bool("serve", false, "handle async method calls") + stdin := flag.Bool("stdin", false, "read data from stdin") flag.Parse() if *stdin { - echo() + if _, err := io.Copy(os.Stdout, os.Stdin); err != nil { + slog.Error("failed echo stdin", + "reason", err) + os.Exit(1) + } + + } else { + w := bufio.NewWriter(os.Stdout) + for _, arg := range os.Args[1:] { + w.WriteString(arg) + w.WriteString(" ") + if err := w.Flush(); err != nil { + slog.Error("failed to flush argument to stdout", + "reason", err) + os.Exit(1) + } + } } - if *serve { + if serve() { // Yield control to the scheduler. os.Exit(system.StatusAwaiting) // The caller will intercept interface{ExitCode() uint32} and @@ -40,3 +57,12 @@ func main() { // Caller will resolve to err = nil. // Top-level CLI command will unblock. } + +func serve() bool { + switch os.Getenv("WW_SERVE") { + case "", "false", "0": + return false + default: + return true + } +} diff --git a/examples/echo/main.wasm b/examples/echo/main.wasm index a23a86f..0791b4d 100644 Binary files a/examples/echo/main.wasm and b/examples/echo/main.wasm differ diff --git a/proc/proc.go b/proc/proc.go index 29ee940..f2d9aac 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -6,7 +6,9 @@ import ( "crypto/rand" "errors" "io" + "log/slog" "runtime" + "strings" "capnproto.org/go/capnp/v3" "github.com/libp2p/go-libp2p/core/protocol" @@ -14,29 +16,23 @@ import ( "github.com/tetratelabs/wazero/api" ) -type Config struct { +type Command struct { Args, Env []string - Stdin io.Reader Stdout, Stderr io.Writer } -func (cfg Config) Instantiate( - ctx context.Context, - r wazero.Runtime, - cm wazero.CompiledModule, -) (*P, error) { +func (cmd Command) Instantiate(ctx context.Context, r wazero.Runtime, cm wazero.CompiledModule) (*P, error) { var p P + var err error // /ww//proc/ pid := NewPID().String() - - var err error - p.Mod, err = r.InstantiateModule(ctx, cm, wazero.NewModuleConfig(). + config := cmd.WithEnv(wazero.NewModuleConfig(). WithName(pid). - WithArgs(cfg.Args...). + WithArgs(cmd.Args...). WithStdin(&p.mailbox). - WithStdout(cfg.Stdout). - WithStderr(cfg.Stderr). + WithStdout(cmd.Stdout). + WithStderr(cmd.Stderr). WithEnv("WW_PID", pid). WithRandSource(rand.Reader). WithOsyield(runtime.Gosched). @@ -44,9 +40,25 @@ func (cfg Config) Instantiate( WithSysNanotime(). WithSysWalltime(). WithStartFunctions()) + p.Mod, err = r.InstantiateModule(ctx, cm, config) return &p, err } +func (cfg Command) WithEnv(mc wazero.ModuleConfig) wazero.ModuleConfig { + for _, s := range cfg.Env { + ss := strings.SplitN(s, "=", 2) + if len(ss) != 2 { + slog.Warn("ignored unparsable environment variable", + "var", s) + continue + } + + mc = mc.WithEnv(ss[0], ss[1]) + } + + return mc +} + type P struct { Parent protocol.ID Mod api.Module diff --git a/proc/proc_test.go b/proc/proc_test.go index 3f610e2..2ebcd2e 100644 --- a/proc/proc_test.go +++ b/proc/proc_test.go @@ -37,7 +37,7 @@ func TestProc_echo(t *testing.T) { stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) - p, err := proc.Config{ + p, err := proc.Command{ Stdout: stdout, Stderr: stderr, }.Instantiate(ctx, r, cm) @@ -85,7 +85,7 @@ func TestProc_echo_repeated_calls(t *testing.T) { stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) - p, err := proc.Config{ + p, err := proc.Command{ Stdout: stdout, Stderr: stderr, }.Instantiate(ctx, r, cm) diff --git a/system/net.go b/system/net.go index 48a074c..24b2fa8 100644 --- a/system/net.go +++ b/system/net.go @@ -2,6 +2,7 @@ package system import ( "context" + "log/slog" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -20,12 +21,25 @@ func (n Net) Bind(ctx context.Context, p *proc.P) ReleaseFunc { handler := proc.StreamHandler{ VersionedID: n.Proto, Proc: p} + proto := handler.Proto() + pid := handler.String() + peer := n.Host.ID() n.Host.SetStreamHandlerMatch( - handler.Proto(), + proto, handler.Match, handler.Bind(ctx)) - return func() { n.Host.RemoveStreamHandler(handler.Proto()) } + slog.DebugContext(ctx, "attached process stream handlers", + "peer", peer, + "proto", proto, + "proc", pid) + return func() { + n.Host.RemoveStreamHandler(proto) + slog.DebugContext(ctx, "detached process stream handlers", + "peer", peer, + "proto", proto, + "proc", pid) + } } // StorePeer is a peer handler that inserts the peer in the diff --git a/ww.go b/ww.go index ef8ad6c..119e17f 100644 --- a/ww.go +++ b/ww.go @@ -57,7 +57,9 @@ func (env Env) Bind(ctx context.Context, r wazero.Runtime) error { return err } else if err := call.SetName("_start"); err != nil { return err - } else if b, err := io.ReadAll(env.IO.Stdin); err != nil { + } + + if b, err := io.ReadAll(env.IO.Stdin); err != nil { return err } else if err := call.SetCallData(b); err != nil { return err @@ -84,10 +86,9 @@ func (env Env) Instantiate(ctx context.Context, r wazero.Runtime) (*proc.P, erro } defer cm.Close(ctx) - return proc.Config{ + return proc.Command{ Args: env.IO.Args, Env: env.IO.Env, - Stdin: env.IO.Stdin, Stdout: env.IO.Stdout, Stderr: env.IO.Stderr, }.Instantiate(ctx, r, cm) @@ -111,18 +112,3 @@ func (env Env) ReadAll(ctx context.Context, name string) ([]byte, error) { return io.ReadAll(f) } - -// func (env Env) WithEnv(mc wazero.ModuleConfig) wazero.ModuleConfig { -// for _, s := range env.IO.Env { -// ss := strings.SplitN(s, "=", 2) -// if len(ss) != 2 { -// slog.Warn("ignored unparsable environment variable", -// "var", s) -// continue -// } - -// mc = mc.WithEnv(ss[0], ss[1]) -// } - -// return mc -// }