Skip to content

Commit

Permalink
refactor: remove duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Oct 27, 2024
1 parent dddfd68 commit 04c9b81
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 129 deletions.
8 changes: 4 additions & 4 deletions cmd/pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,23 @@ func (m *debugModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
bps := m.debugger.Breakpoints()
m.view = &breakpointDebugView{id: len(bps) - 1, breakpoint: bp}

return m, tea.Cmd(func() tea.Msg {
return m, func() tea.Msg {
if m.debugger.Pause(context.Background()) {
if m.debugger.Breakpoint() == bp {
return m.debugger.Frame()
}
}
return nil
})
}
case "continue", "c":
m.view = nil

return m, tea.Cmd(func() tea.Msg {
return m, func() tea.Msg {
if m.debugger.Step(context.Background()) {
return m.debugger.Frame()
}
return nil
})
}
case "delete", "d":
bps := m.debugger.Breakpoints()

Expand Down
24 changes: 9 additions & 15 deletions ext/pkg/control/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewCallNode() *CallNode {
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPorts[0].AddListener(port.ListenFunc(n.backward0))
n.outPorts[1].AddListener(port.ListenFunc(n.backward1))
n.outPorts[0].AddListener(n.backward(0))
n.outPorts[1].AddListener(n.backward(1))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
Expand Down Expand Up @@ -112,20 +112,14 @@ func (n *CallNode) forward(proc *process.Process) {
}
}

func (n *CallNode) backward0(proc *process.Process) {
outWriter0 := n.outPorts[0].Open(proc)
func (n *CallNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
outWriter := n.outPorts[index].Open(proc)

for backPck := range outWriter0.Receive() {
n.tracer.Receive(outWriter0, backPck)
}
}

func (n *CallNode) backward1(proc *process.Process) {
outWriter1 := n.outPorts[1].Open(proc)

for backPck := range outWriter1.Receive() {
n.tracer.Receive(outWriter1, backPck)
}
for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
})
}

func (n *CallNode) catch(proc *process.Process) {
Expand Down
24 changes: 9 additions & 15 deletions ext/pkg/control/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewLoopNode() *LoopNode {
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPorts[0].AddListener(port.ListenFunc(n.backward0))
n.outPorts[1].AddListener(port.ListenFunc(n.backward1))
n.outPorts[0].AddListener(n.backward(0))
n.outPorts[1].AddListener(n.backward(1))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
Expand Down Expand Up @@ -130,20 +130,14 @@ func (n *LoopNode) forward(proc *process.Process) {
}
}

func (n *LoopNode) backward0(proc *process.Process) {
outWriter0 := n.outPorts[0].Open(proc)
func (n *LoopNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
outWriter := n.outPorts[index].Open(proc)

for backPck := range outWriter0.Receive() {
n.tracer.Receive(outWriter0, backPck)
}
}

func (n *LoopNode) backward1(proc *process.Process) {
outWriter1 := n.outPorts[1].Open(proc)

for backPck := range outWriter1.Receive() {
n.tracer.Receive(outWriter1, backPck)
}
for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
})
}

func (n *LoopNode) catch(proc *process.Process) {
Expand Down
10 changes: 5 additions & 5 deletions ext/pkg/mime/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er
encode := header.Get(HeaderContentEncoding)

if typ == "" {
if types := DetectTypes(value); len(types) > 0 {
typ = types[0]
if detects := DetectTypes(value); len(detects) > 0 {
typ = detects[0]
header.Set(HeaderContentType, typ)
}
}
Expand Down Expand Up @@ -160,8 +160,8 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er

typ := h.Get(HeaderContentType)
if typ == "" {
if types := DetectTypes(data); len(types) > 0 {
typ = types[0]
if detects := DetectTypes(data); len(detects) > 0 {
typ = detects[0]
h.Set(HeaderContentType, typ)
}
}
Expand Down Expand Up @@ -292,9 +292,9 @@ func Decode(reader io.Reader, header textproto.MIMEHeader) (types.Value, error)
if err != nil {
return nil, err
}
defer file.Close()

data, err := Decode(file, fh.Header)
file.Close()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/mime/negotiation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestDetectTypes(t *testing.T) {

for _, tt := range tests {
t.Run(fmt.Sprintf("%v", tt.when), func(t *testing.T) {
types := DetectTypes(tt.when)
assert.Greater(t, len(types), 0)
detects := DetectTypes(tt.when)
assert.Greater(t, len(detects), 0)
})
}
}
Expand Down
48 changes: 24 additions & 24 deletions ext/pkg/network/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ type WebSocketNodeSpec struct {
Timeout time.Duration `map:"timeout,omitempty"`
}

// WebSocketNode represents a node for establishing WebSocket client connections.
// WebSocketNode represents a node for establishing WebSocket client connection.
type WebSocketNode struct {
*WebSocketConnNode
dialer *websocket.Dialer
url *url.URL
mu sync.RWMutex
}

// WebSocketConnNode represents a node for handling WebSocket connections.
// WebSocketConnNode represents a node for handling WebSocket connection.
type WebSocketConnNode struct {
action func(*process.Process, *packet.Packet) (*websocket.Conn, error)
connections *process.Local[*websocket.Conn]
ioPort *port.InPort
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
action func(*process.Process, *packet.Packet) (*websocket.Conn, error)
conns *process.Local[*websocket.Conn]
ioPort *port.InPort
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
}

// WebSocketPayload represents the payload structure for WebSocket messages.
Expand Down Expand Up @@ -82,7 +82,7 @@ func NewWebSocketNode(url *url.URL) *WebSocketNode {
return n
}

// SetTimeout sets the handshake timeout for WebSocket connections.
// SetTimeout sets the handshake timeout for WebSocket conns.
func (n *WebSocketNode) SetTimeout(timeout time.Duration) {
n.mu.Lock()
defer n.mu.Unlock()
Expand Down Expand Up @@ -120,12 +120,12 @@ func (n *WebSocketNode) connect(_ *process.Process, inPck *packet.Packet) (*webs
// NewWebSocketConnNode creates a new WebSocketConnNode.
func NewWebSocketConnNode(action func(*process.Process, *packet.Packet) (*websocket.Conn, error)) *WebSocketConnNode {
n := &WebSocketConnNode{
action: action,
connections: process.NewLocal[*websocket.Conn](),
ioPort: port.NewIn(),
inPort: port.NewIn(),
outPort: port.NewOut(),
errPort: port.NewOut(),
action: action,
conns: process.NewLocal[*websocket.Conn](),
ioPort: port.NewIn(),
inPort: port.NewIn(),
outPort: port.NewOut(),
errPort: port.NewOut(),
}

n.ioPort.AddListener(port.ListenFunc(n.connect))
Expand Down Expand Up @@ -164,7 +164,7 @@ func (n *WebSocketConnNode) Close() error {
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()
n.connections.Close()
n.conns.Close()
return nil
}

Expand All @@ -178,7 +178,7 @@ func (n *WebSocketConnNode) connect(proc *process.Process) {
backPck := packet.SendOrFallback(errWriter, errPck, errPck)
ioReader.Receive(backPck)
} else {
n.connections.Store(proc, conn)
n.conns.Store(proc, conn)

child := proc.Fork()
child.AddExitHook(process.ExitFunc(func(_ error) {
Expand Down Expand Up @@ -283,25 +283,25 @@ func (n *WebSocketConnNode) produce(proc *process.Process) {
}

func (n *WebSocketConnNode) connection(proc *process.Process) (*websocket.Conn, bool) {
connections := make(chan *websocket.Conn)
defer close(connections)
conns := make(chan *websocket.Conn)
defer close(conns)

hook := process.StoreFunc(func(conn *websocket.Conn) {
connections <- conn
conns <- conn
})

for p := proc; p != nil; p = p.Parent() {
go n.connections.AddStoreHook(p, hook)
go n.conns.AddStoreHook(p, hook)
}
defer func() {
for p := proc; p != nil; p = p.Parent() {
n.connections.RemoveStoreHook(p, hook)
n.conns.RemoveStoreHook(p, hook)
}
}()

select {
case con := <-connections:
return con, true
case conn := <-conns:
return conn, true
case <-proc.Context().Done():
return nil, false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/debug/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d *Debugger) Breakpoints() []*Breakpoint {
d.wmu.RLock()
defer d.wmu.RUnlock()

return d.breakpoints[:]
return append([]*Breakpoint(nil), d.breakpoints...)
}

// Pause blocks until a breakpoint is hit or monitoring is done.
Expand Down
68 changes: 31 additions & 37 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func (n *ManyToOneNode) In(name string) *port.InPort {
inPort := port.NewIn()
n.inPorts = append(n.inPorts, inPort)
if n.action != nil {
i := i
inPort.AddListener(port.ListenFunc(func(proc *process.Process) {
n.forward(proc, i)
}))
inPort.AddListener(n.forward(i))
}
}
}
Expand Down Expand Up @@ -93,42 +90,42 @@ func (n *ManyToOneNode) Close() error {
return nil
}

func (n *ManyToOneNode) forward(proc *process.Process, index int) {
n.mu.RLock()
defer n.mu.RUnlock()
func (n *ManyToOneNode) forward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

inReaders := make([]*packet.Reader, len(n.inPorts))
for i, inPort := range n.inPorts {
inReaders[i] = inPort.Open(proc)
}
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)
inReader := n.inPorts[index].Open(proc)
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)

readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
return packet.NewReadGroup(inReaders), nil
})

for inPck := range inReaders[index].Read() {
n.tracer.Read(inReaders[index], inPck)

if inPcks := readGroup.Read(inReaders[index], inPck); len(inPcks) < len(inReaders) {
n.tracer.Reduce(inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
n.tracer.Reduce(inPck)
readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
inReaders := make([]*packet.Reader, len(n.inPorts))
for i, inPort := range n.inPorts {
inReaders[i] = inPort.Open(proc)
}
return packet.NewReadGroup(inReaders), nil
})

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
n.tracer.Reduce(inPck)
}
}
}
})
}

func (n *ManyToOneNode) backward(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outWriter := n.outPort.Open(proc)

for backPck := range outWriter.Receive() {
Expand All @@ -137,9 +134,6 @@ func (n *ManyToOneNode) backward(proc *process.Process) {
}

func (n *ManyToOneNode) catch(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

errWriter := n.errPort.Open(proc)

for backPck := range errWriter.Receive() {
Expand Down
Loading

0 comments on commit 04c9b81

Please sign in to comment.