diff --git a/pkg/node/onetomany.go b/pkg/node/onetomany.go index 308c5dc1..e8c5e55a 100644 --- a/pkg/node/onetomany.go +++ b/pkg/node/onetomany.go @@ -129,10 +129,10 @@ func (n *OneToManyNode) forward(proc *process.Process) { } errStream := n.errPort.Open(proc) - for func() bool { + for { inPck, ok := <-inStream.Receive() if !ok { - return false + return } if outPcks, errPck := n.action(proc, inPck); errPck != nil { @@ -175,9 +175,6 @@ func (n *OneToManyNode) forward(proc *process.Process) { } else { proc.Stack().Clear(inPck.ID()) } - - return true - }() { } } diff --git a/pkg/node/onetoone.go b/pkg/node/onetoone.go index b7dc08d2..8e4493f6 100644 --- a/pkg/node/onetoone.go +++ b/pkg/node/onetoone.go @@ -134,10 +134,10 @@ func (n *OneToOneNode) forward(proc *process.Process, inStream *port.Stream, out errStream := n.errPort.Open(proc) - for func() bool { + for { inPck, ok := <-inStream.Receive() if !ok { - return false + return } if outPck, errPck := n.action(proc, inPck); errPck != nil { @@ -165,9 +165,6 @@ func (n *OneToOneNode) forward(proc *process.Process, inStream *port.Stream, out } else { proc.Stack().Clear(inPck.ID()) } - - return true - }() { } } @@ -178,10 +175,10 @@ func (n *OneToOneNode) backward(proc *process.Process, outStream *port.Stream) { var ioStream *port.Stream var inStream *port.Stream - for func() bool { + for { backPck, ok := <-outStream.Receive() if !ok { - return false + return } if ioStream == nil { @@ -196,8 +193,5 @@ func (n *OneToOneNode) backward(proc *process.Process, outStream *port.Stream) { } else if _, ok := proc.Stack().Pop(backPck.ID(), inStream.ID()); ok { inStream.Send(backPck) } - - return true - }() { } }