Skip to content

Commit

Permalink
fix: escape death lock when consume after close
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Jul 12, 2024
1 parent eeb2251 commit 57ddd1d
Show file tree
Hide file tree
Showing 47 changed files with 193 additions and 373 deletions.
8 changes: 4 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ To modify node specifications, use the Command-Line Interface (CLI) to update th

- kind: switch
name: catch
match:
- when: self == "invalid argument"
matches:
- when: self == "unsupported type" || self == "unsupported value"
port: out[0]
- when: 'true'
port: out[1]
Expand Down Expand Up @@ -164,9 +164,9 @@ Compiled nodes are converted into symbols and stored in a symbol table, which co
+--------------------------+ +-------------------+
| Database | | Loader |
| +--------------------+ | | +-------------+ |
| | Node Specification | |-->| | Scheme | |
| | Node Specification | | | | Scheme | |
| +--------------------+ | | | +-------+ | |
| | Node Specification | | | | | Codec | | |--+
| | Node Specification | |-->| | | Codec | | |--+
| +--------------------+ | | | +-------+ | | |
| | Node Specification | | | +-------------+ | |
| +--------------------+ | +-------------------+ |
Expand Down
8 changes: 4 additions & 4 deletions docs/architecture_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@

- kind: switch
name: catch
match:
- when: self == "invalid argument"
matches:
- when: self == "unsupported type" || self == "unsupported value"
port: out[0]
- when: 'true'
port: out[1]
Expand Down Expand Up @@ -164,9 +164,9 @@
+--------------------------+ +-------------------+
| Database | | Loader |
| +--------------------+ | | +-------------+ |
| | Node Specification | |-->| | Scheme | |
| | Node Specification | | | | Scheme | |
| +--------------------+ | | | +-------+ | |
| | Node Specification | | | | | Codec | | |--+
| | Node Specification | |-->| | | Codec | | |--+
| +--------------------+ | | | +-------+ | | |
| | Node Specification | | | +-------------+ | |
| +--------------------+ | +-------------------+ |
Expand Down
4 changes: 2 additions & 2 deletions examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@

- kind: switch
name: catch
match:
- when: self == "invalid argument"
matches:
- when: self == "unsupported type" || self == "unsupported value"
port: out[0]
- when: 'true'
port: out[1]
Expand Down
36 changes: 18 additions & 18 deletions ext/pkg/control/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (n *BlockNode) In(name string) *port.InPort {
n.mu.Lock()
defer n.mu.Unlock()

if p, ok := n.inPorts[name]; ok {
return p
if inPort, ok := n.inPorts[name]; ok {
return inPort
}
if len(n.nodes) > 0 {
if p := n.nodes[0].In(name); p != nil {
n.inPorts[name] = p
return p
if inPort := n.nodes[0].In(name); inPort != nil {
n.inPorts[name] = inPort
return inPort
}
}
return nil
Expand All @@ -96,34 +96,34 @@ func (n *BlockNode) Out(name string) *port.OutPort {
n.mu.Lock()
defer n.mu.Unlock()

if p, ok := n.outPorts[name]; ok {
return p
if outPort, ok := n.outPorts[name]; ok {
return outPort
}
if len(n.nodes) > 0 {
if p := n.nodes[len(n.nodes)-1].Out(name); p != nil {
n.outPorts[name] = p
return p
if outPort := n.nodes[len(n.nodes)-1].Out(name); outPort != nil {
n.outPorts[name] = outPort
return outPort
}
}
return nil
}

// Close closes all ports associated with the node.
func (n *BlockNode) Close() error {
n.mu.Lock()
defer n.mu.Unlock()
n.mu.RLock()
defer n.mu.RUnlock()

for _, n := range n.nodes {
if err := n.Close(); err != nil {
for _, node := range n.nodes {
if err := node.Close(); err != nil {
return err
}
}

for _, p := range n.inPorts {
p.Close()
for _, inPort := range n.inPorts {
inPort.Close()
}
for _, p := range n.outPorts {
p.Close()
for _, outPort := range n.outPorts {
outPort.Close()
}

return nil
Expand Down
26 changes: 0 additions & 26 deletions ext/pkg/control/call.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package control

import (
"sync"

"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
Expand All @@ -18,7 +16,6 @@ type CallNode struct {
inPort *port.InPort
outPorts []*port.OutPort
errPort *port.OutPort
mu sync.RWMutex
}

// CallNodeSpec holds the specifications for creating a CallNode.
Expand Down Expand Up @@ -49,23 +46,16 @@ func NewCallNode() *CallNode {

// In returns the input port with the specified name.
func (n *CallNode) In(name string) *port.InPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortIn:
return n.inPort
default:
}

return nil
}

// Out returns the output port with the specified name.
func (n *CallNode) Out(name string) *port.OutPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortOut:
return n.outPorts[0]
Expand All @@ -79,15 +69,11 @@ func (n *CallNode) Out(name string) *port.OutPort {
}
}
}

return nil
}

// Close closes all ports associated with the node.
func (n *CallNode) Close() error {
n.mu.Lock()
defer n.mu.Unlock()

n.inPort.Close()
for _, outPort := range n.outPorts {
outPort.Close()
Expand All @@ -99,9 +85,6 @@ func (n *CallNode) Close() error {
}

func (n *CallNode) forward(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)
outWriter0 := n.outPorts[0].Open(proc)
outWriter1 := n.outPorts[1].Open(proc)
Expand All @@ -128,9 +111,6 @@ func (n *CallNode) forward(proc *process.Process) {
}

func (n *CallNode) backward0(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outWriter0 := n.outPorts[0].Open(proc)

for {
Expand All @@ -144,9 +124,6 @@ func (n *CallNode) backward0(proc *process.Process) {
}

func (n *CallNode) backward1(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outWriter1 := n.outPorts[1].Open(proc)

for {
Expand All @@ -160,9 +137,6 @@ func (n *CallNode) backward1(proc *process.Process) {
}

func (n *CallNode) catch(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

errWriter := n.errPort.Open(proc)

for {
Expand Down
23 changes: 0 additions & 23 deletions ext/pkg/control/fork.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package control

import (
"sync"

"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
Expand All @@ -17,7 +15,6 @@ type ForkNode struct {
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
mu sync.RWMutex
}

// ForkNodeSpec holds the specifications for creating a ForkNode.
Expand Down Expand Up @@ -46,39 +43,28 @@ func NewForkNode() *ForkNode {

// In returns the input port with the specified name.
func (n *ForkNode) In(name string) *port.InPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortIn:
return n.inPort
default:
}

return nil
}

// Out returns the output port with the specified name.
func (n *ForkNode) Out(name string) *port.OutPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortOut:
return n.outPort
case node.PortErr:
return n.errPort
default:
}

return nil
}

// Close closes all ports associated with the node.
func (n *ForkNode) Close() error {
n.mu.Lock()
defer n.mu.Unlock()

n.inPort.Close()
n.outPort.Close()
n.errPort.Close()
Expand All @@ -87,9 +73,6 @@ func (n *ForkNode) Close() error {
}

func (n *ForkNode) forward(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)

for {
Expand All @@ -107,9 +90,6 @@ func (n *ForkNode) forward(proc *process.Process) {
}

func (n *ForkNode) backward(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)

Expand All @@ -132,9 +112,6 @@ func (n *ForkNode) backward(proc *process.Process) {
}

func (n *ForkNode) catch(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

errWriter := n.errPort.Open(proc)

for {
Expand Down
20 changes: 0 additions & 20 deletions ext/pkg/control/if.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package control

import (
"reflect"
"sync"

"github.com/siyul-park/uniflow/ext/pkg/language"
"github.com/siyul-park/uniflow/pkg/node"
Expand All @@ -21,7 +20,6 @@ type IfNode struct {
inPort *port.InPort
outPorts []*port.OutPort
errPort *port.OutPort
mu sync.RWMutex
}

// IfNodeSpec holds specifications for creating an IfNode.
Expand Down Expand Up @@ -58,9 +56,6 @@ func NewIfNode(condition func(any) (bool, error)) *IfNode {

// In returns the input port with the specified name.
func (n *IfNode) In(name string) *port.InPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortIn:
return n.inPort
Expand All @@ -72,9 +67,6 @@ func (n *IfNode) In(name string) *port.InPort {

// Out returns the output port with the specified name.
func (n *IfNode) Out(name string) *port.OutPort {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortOut:
return n.outPorts[0]
Expand All @@ -93,9 +85,6 @@ func (n *IfNode) Out(name string) *port.OutPort {

// Close closes all ports associated with the node.
func (n *IfNode) Close() error {
n.mu.Lock()
defer n.mu.Unlock()

n.inPort.Close()
for _, outPort := range n.outPorts {
outPort.Close()
Expand All @@ -107,9 +96,6 @@ func (n *IfNode) Close() error {
}

func (n *IfNode) forward(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)
outWriter0 := n.outPorts[0].Open(proc)
outWriter1 := n.outPorts[1].Open(proc)
Expand Down Expand Up @@ -138,9 +124,6 @@ func (n *IfNode) forward(proc *process.Process) {
}

func (n *IfNode) backward(proc *process.Process, index int) {
n.mu.RLock()
defer n.mu.RUnlock()

outWriter := n.outPorts[index].Open(proc)

for {
Expand All @@ -154,9 +137,6 @@ func (n *IfNode) backward(proc *process.Process, index int) {
}

func (n *IfNode) catch(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

errWriter := n.errPort.Open(proc)

for {
Expand Down
Loading

0 comments on commit 57ddd1d

Please sign in to comment.