diff --git a/examples/echo.yaml b/examples/echo.yaml index ef4349f8..faf386d2 100644 --- a/examples/echo.yaml +++ b/examples/echo.yaml @@ -2,7 +2,7 @@ name: http address: :8000 links: - io: + out: - name: router port: in @@ -15,4 +15,4 @@ links: out[0]: - name: http - port: io + port: in diff --git a/pkg/plugin/networkx/http.go b/pkg/plugin/networkx/http.go index b8a91dfe..285de936 100644 --- a/pkg/plugin/networkx/http.go +++ b/pkg/plugin/networkx/http.go @@ -35,6 +35,8 @@ type ( listener net.Listener listenerNetwork string ioPort *port.Port + inPort *port.Port + outPort *port.Port errPort *port.Port mu sync.RWMutex } @@ -267,6 +269,8 @@ func NewHTTPNode(config HTTPNodeConfig) *HTTPNode { server: new(http.Server), listenerNetwork: "tcp", ioPort: port.New(), + inPort: port.New(), + outPort: port.New(), errPort: port.New(), } n.server.Handler = n @@ -288,6 +292,10 @@ func (n *HTTPNode) Port(name string) (*port.Port, bool) { switch name { case node.PortIO: return n.ioPort, true + case node.PortIn: + return n.inPort, true + case node.PortOut: + return n.outPort, true case node.PortErr: return n.errPort, true default: @@ -349,6 +357,8 @@ func (n *HTTPNode) Close() error { return err } n.ioPort.Close() + n.inPort.Close() + n.outPort.Close() n.errPort.Close() return nil @@ -372,8 +382,9 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { } }() - outStream := n.ioPort.Open(proc) - inStream := n.ioPort.Open(proc) + ioStream := n.ioPort.Open(proc) + inStream := n.inPort.Open(proc) + outStream := n.outPort.Open(proc) req, err := n.request(r) if err != nil { @@ -386,24 +397,56 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } outPck := packet.New(outPayload) - outStream.Send(outPck) - inPck, ok := <-inStream.Receive() - if !ok { - _ = n.response(r, w, n.errorPayload(proc, ServiceUnavailable)) + if ioStream.Links() > 0 { + proc.Stack().Push(outPck.ID(), ioStream.ID()) + ioStream.Send(outPck) + } + if outStream.Links() > 0 { + proc.Stack().Push(outPck.ID(), outStream.ID()) + outStream.Send(outPck) + } + if ioStream.Links()+outStream.Links() == 0 { return } - proc.Stack().Clear(inPck.ID()) - inPayload := inPck.Payload() + for { + var stream *port.Stream + var inPck *packet.Packet + var ok bool + select { + case inPck, ok = <-inStream.Receive(): + stream = inStream + case inPck, ok = <-outStream.Receive(): + stream = outStream + case inPck, ok = <-ioStream.Receive(): + stream = ioStream + } + if !ok { + _ = n.response(r, w, n.errorPayload(proc, ServiceUnavailable)) + return + } - var res HTTPPayload - if err := primitive.Unmarshal(inPayload, &res); err != nil { - res.Body = inPayload - } + if stream == outStream || stream == ioStream { + if _, ok := proc.Stack().Pop(inPck.ID(), stream.ID()); !ok { + continue + } + } else { + proc.Stack().Clear(inPck.ID()) + } + + inPayload := inPck.Payload() + + var res HTTPPayload + if err := primitive.Unmarshal(inPayload, &res); err != nil { + res.Body = inPayload + } + + if err := n.response(r, w, res); err != nil { + _ = n.response(r, w, n.errorPayload(proc, InternalServerError)) + } - if err := n.response(r, w, res); err != nil { - _ = n.response(r, w, n.errorPayload(proc, InternalServerError)) + break } } diff --git a/pkg/plugin/networkx/http_test.go b/pkg/plugin/networkx/http_test.go index 238cca9a..c1a003be 100644 --- a/pkg/plugin/networkx/http_test.go +++ b/pkg/plugin/networkx/http_test.go @@ -41,6 +41,14 @@ func TestHTTPNode_Port(t *testing.T) { assert.True(t, ok) assert.NotNil(t, p) + p, ok = n.Port(node.PortIn) + assert.True(t, ok) + assert.NotNil(t, p) + + p, ok = n.Port(node.PortOut) + assert.True(t, ok) + assert.NotNil(t, p) + p, ok = n.Port(node.PortErr) assert.True(t, ok) assert.NotNil(t, p) @@ -69,7 +77,7 @@ func TestHTTPNode_StartAndClose(t *testing.T) { } func TestHTTPNode_ServeHTTP(t *testing.T) { - t.Run("Hello World", func(t *testing.T) { + t.Run("IO", func(t *testing.T) { n := NewHTTPNode(HTTPNodeConfig{}) defer func() { _ = n.Close() }() @@ -105,29 +113,34 @@ func TestHTTPNode_ServeHTTP(t *testing.T) { assert.Equal(t, "Hello World!", w.Body.String()) }) - t.Run("HTTPError", func(t *testing.T) { + t.Run("In/Out", func(t *testing.T) { n := NewHTTPNode(HTTPNodeConfig{}) defer func() { _ = n.Close() }() - httpErr := NotFound + in := port.New() + inPort, _ := n.Port(node.PortIn) + inPort.Link(in) - io := port.New() - ioPort, _ := n.Port(node.PortIO) - ioPort.Link(io) + out := port.New() + outPort, _ := n.Port(node.PortOut) + outPort.Link(out) - io.AddInitHook(port.InitHookFunc(func(proc *process.Process) { - ioStream := io.Open(proc) + out.AddInitHook(port.InitHookFunc(func(proc *process.Process) { + inStream := in.Open(proc) + outStream := out.Open(proc) for { - inPck, ok := <-ioStream.Receive() + inPck, ok := <-outStream.Receive() if !ok { return } - outPayload, _ := primitive.MarshalText(httpErr) - outPck := packet.New(outPayload) + outPck := packet.New(primitive.NewMap( + primitive.NewString("body"), primitive.NewString("Hello World!"), + primitive.NewString("status"), primitive.NewInt(200), + )) proc.Stack().Link(inPck.ID(), outPck.ID()) - ioStream.Send(outPck) + inStream.Send(outPck) } })) @@ -136,8 +149,8 @@ func TestHTTPNode_ServeHTTP(t *testing.T) { n.ServeHTTP(w, r) - assert.Equal(t, httpErr.Status, w.Result().StatusCode) + assert.Equal(t, 200, w.Result().StatusCode) assert.Equal(t, TextPlainCharsetUTF8, w.Header().Get(HeaderContentType)) - assert.Equal(t, httpErr.Body.Interface(), w.Body.String()) + assert.Equal(t, "Hello World!", w.Body.String()) }) }