diff --git a/cmd/pkg/cli/debug.go b/cmd/pkg/cli/debug.go index eb616b6c..4ed28024 100644 --- a/cmd/pkg/cli/debug.go +++ b/cmd/pkg/cli/debug.go @@ -172,23 +172,23 @@ func (m *debugModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { bps := m.debugger.Breakpoints() m.view = &breakpointDebugView{id: len(bps) - 1, breakpoint: bp} - return m, tea.Cmd(func() tea.Msg { + return m, func() tea.Msg { if m.debugger.Pause(context.Background()) { if m.debugger.Breakpoint() == bp { return m.debugger.Frame() } } return nil - }) + } case "continue", "c": m.view = nil - return m, tea.Cmd(func() tea.Msg { + return m, func() tea.Msg { if m.debugger.Step(context.Background()) { return m.debugger.Frame() } return nil - }) + } case "delete", "d": bps := m.debugger.Breakpoints() diff --git a/ext/pkg/control/call.go b/ext/pkg/control/call.go index 77088c94..46688298 100644 --- a/ext/pkg/control/call.go +++ b/ext/pkg/control/call.go @@ -44,8 +44,8 @@ func NewCallNode() *CallNode { } n.inPort.AddListener(port.ListenFunc(n.forward)) - n.outPorts[0].AddListener(port.ListenFunc(n.backward0)) - n.outPorts[1].AddListener(port.ListenFunc(n.backward1)) + n.outPorts[0].AddListener(n.backward(0)) + n.outPorts[1].AddListener(n.backward(1)) n.errPort.AddListener(port.ListenFunc(n.catch)) return n @@ -112,20 +112,14 @@ func (n *CallNode) forward(proc *process.Process) { } } -func (n *CallNode) backward0(proc *process.Process) { - outWriter0 := n.outPorts[0].Open(proc) +func (n *CallNode) backward(index int) port.Listener { + return port.ListenFunc(func(proc *process.Process) { + outWriter := n.outPorts[index].Open(proc) - for backPck := range outWriter0.Receive() { - n.tracer.Receive(outWriter0, backPck) - } -} - -func (n *CallNode) backward1(proc *process.Process) { - outWriter1 := n.outPorts[1].Open(proc) - - for backPck := range outWriter1.Receive() { - n.tracer.Receive(outWriter1, backPck) - } + for backPck := range outWriter.Receive() { + n.tracer.Receive(outWriter, backPck) + } + }) } func (n *CallNode) catch(proc *process.Process) { diff --git a/ext/pkg/control/loop.go b/ext/pkg/control/loop.go index e4fb1f2e..71e648ed 100644 --- a/ext/pkg/control/loop.go +++ b/ext/pkg/control/loop.go @@ -44,8 +44,8 @@ func NewLoopNode() *LoopNode { } n.inPort.AddListener(port.ListenFunc(n.forward)) - n.outPorts[0].AddListener(port.ListenFunc(n.backward0)) - n.outPorts[1].AddListener(port.ListenFunc(n.backward1)) + n.outPorts[0].AddListener(n.backward(0)) + n.outPorts[1].AddListener(n.backward(1)) n.errPort.AddListener(port.ListenFunc(n.catch)) return n @@ -130,20 +130,14 @@ func (n *LoopNode) forward(proc *process.Process) { } } -func (n *LoopNode) backward0(proc *process.Process) { - outWriter0 := n.outPorts[0].Open(proc) +func (n *LoopNode) backward(index int) port.Listener { + return port.ListenFunc(func(proc *process.Process) { + outWriter := n.outPorts[index].Open(proc) - for backPck := range outWriter0.Receive() { - n.tracer.Receive(outWriter0, backPck) - } -} - -func (n *LoopNode) backward1(proc *process.Process) { - outWriter1 := n.outPorts[1].Open(proc) - - for backPck := range outWriter1.Receive() { - n.tracer.Receive(outWriter1, backPck) - } + for backPck := range outWriter.Receive() { + n.tracer.Receive(outWriter, backPck) + } + }) } func (n *LoopNode) catch(proc *process.Process) { diff --git a/ext/pkg/mime/encoding.go b/ext/pkg/mime/encoding.go index 02d0dc30..834ac4fe 100644 --- a/ext/pkg/mime/encoding.go +++ b/ext/pkg/mime/encoding.go @@ -34,8 +34,8 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er encode := header.Get(HeaderContentEncoding) if typ == "" { - if types := DetectTypes(value); len(types) > 0 { - typ = types[0] + if detects := DetectTypes(value); len(detects) > 0 { + typ = detects[0] header.Set(HeaderContentType, typ) } } @@ -160,8 +160,8 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er typ := h.Get(HeaderContentType) if typ == "" { - if types := DetectTypes(data); len(types) > 0 { - typ = types[0] + if detects := DetectTypes(data); len(detects) > 0 { + typ = detects[0] h.Set(HeaderContentType, typ) } } @@ -292,9 +292,9 @@ func Decode(reader io.Reader, header textproto.MIMEHeader) (types.Value, error) if err != nil { return nil, err } - defer file.Close() data, err := Decode(file, fh.Header) + file.Close() if err != nil { return nil, err } diff --git a/ext/pkg/mime/negotiation_test.go b/ext/pkg/mime/negotiation_test.go index eee5f147..2d810f57 100644 --- a/ext/pkg/mime/negotiation_test.go +++ b/ext/pkg/mime/negotiation_test.go @@ -29,8 +29,8 @@ func TestDetectTypes(t *testing.T) { for _, tt := range tests { t.Run(fmt.Sprintf("%v", tt.when), func(t *testing.T) { - types := DetectTypes(tt.when) - assert.Greater(t, len(types), 0) + detects := DetectTypes(tt.when) + assert.Greater(t, len(detects), 0) }) } } diff --git a/ext/pkg/network/websocket.go b/ext/pkg/network/websocket.go index ca678337..30d223ae 100644 --- a/ext/pkg/network/websocket.go +++ b/ext/pkg/network/websocket.go @@ -27,7 +27,7 @@ type WebSocketNodeSpec struct { Timeout time.Duration `map:"timeout,omitempty"` } -// WebSocketNode represents a node for establishing WebSocket client connections. +// WebSocketNode represents a node for establishing WebSocket client connection. type WebSocketNode struct { *WebSocketConnNode dialer *websocket.Dialer @@ -35,14 +35,14 @@ type WebSocketNode struct { mu sync.RWMutex } -// WebSocketConnNode represents a node for handling WebSocket connections. +// WebSocketConnNode represents a node for handling WebSocket connection. type WebSocketConnNode struct { - action func(*process.Process, *packet.Packet) (*websocket.Conn, error) - connections *process.Local[*websocket.Conn] - ioPort *port.InPort - inPort *port.InPort - outPort *port.OutPort - errPort *port.OutPort + action func(*process.Process, *packet.Packet) (*websocket.Conn, error) + conns *process.Local[*websocket.Conn] + ioPort *port.InPort + inPort *port.InPort + outPort *port.OutPort + errPort *port.OutPort } // WebSocketPayload represents the payload structure for WebSocket messages. @@ -82,7 +82,7 @@ func NewWebSocketNode(url *url.URL) *WebSocketNode { return n } -// SetTimeout sets the handshake timeout for WebSocket connections. +// SetTimeout sets the handshake timeout for WebSocket conns. func (n *WebSocketNode) SetTimeout(timeout time.Duration) { n.mu.Lock() defer n.mu.Unlock() @@ -120,12 +120,12 @@ func (n *WebSocketNode) connect(_ *process.Process, inPck *packet.Packet) (*webs // NewWebSocketConnNode creates a new WebSocketConnNode. func NewWebSocketConnNode(action func(*process.Process, *packet.Packet) (*websocket.Conn, error)) *WebSocketConnNode { n := &WebSocketConnNode{ - action: action, - connections: process.NewLocal[*websocket.Conn](), - ioPort: port.NewIn(), - inPort: port.NewIn(), - outPort: port.NewOut(), - errPort: port.NewOut(), + action: action, + conns: process.NewLocal[*websocket.Conn](), + ioPort: port.NewIn(), + inPort: port.NewIn(), + outPort: port.NewOut(), + errPort: port.NewOut(), } n.ioPort.AddListener(port.ListenFunc(n.connect)) @@ -164,7 +164,7 @@ func (n *WebSocketConnNode) Close() error { n.inPort.Close() n.outPort.Close() n.errPort.Close() - n.connections.Close() + n.conns.Close() return nil } @@ -178,7 +178,7 @@ func (n *WebSocketConnNode) connect(proc *process.Process) { backPck := packet.SendOrFallback(errWriter, errPck, errPck) ioReader.Receive(backPck) } else { - n.connections.Store(proc, conn) + n.conns.Store(proc, conn) child := proc.Fork() child.AddExitHook(process.ExitFunc(func(_ error) { @@ -283,25 +283,25 @@ func (n *WebSocketConnNode) produce(proc *process.Process) { } func (n *WebSocketConnNode) connection(proc *process.Process) (*websocket.Conn, bool) { - connections := make(chan *websocket.Conn) - defer close(connections) + conns := make(chan *websocket.Conn) + defer close(conns) hook := process.StoreFunc(func(conn *websocket.Conn) { - connections <- conn + conns <- conn }) for p := proc; p != nil; p = p.Parent() { - go n.connections.AddStoreHook(p, hook) + go n.conns.AddStoreHook(p, hook) } defer func() { for p := proc; p != nil; p = p.Parent() { - n.connections.RemoveStoreHook(p, hook) + n.conns.RemoveStoreHook(p, hook) } }() select { - case con := <-connections: - return con, true + case conn := <-conns: + return conn, true case <-proc.Context().Done(): return nil, false } diff --git a/pkg/debug/debugger.go b/pkg/debug/debugger.go index 3b3018af..e2193087 100644 --- a/pkg/debug/debugger.go +++ b/pkg/debug/debugger.go @@ -69,7 +69,7 @@ func (d *Debugger) Breakpoints() []*Breakpoint { d.wmu.RLock() defer d.wmu.RUnlock() - return d.breakpoints[:] + return append([]*Breakpoint(nil), d.breakpoints...) } // Pause blocks until a breakpoint is hit or monitoring is done. diff --git a/pkg/node/manytoone.go b/pkg/node/manytoone.go index 93571fad..d24c5b4b 100644 --- a/pkg/node/manytoone.go +++ b/pkg/node/manytoone.go @@ -51,10 +51,7 @@ func (n *ManyToOneNode) In(name string) *port.InPort { inPort := port.NewIn() n.inPorts = append(n.inPorts, inPort) if n.action != nil { - i := i - inPort.AddListener(port.ListenFunc(func(proc *process.Process) { - n.forward(proc, i) - })) + inPort.AddListener(n.forward(i)) } } } @@ -93,42 +90,42 @@ func (n *ManyToOneNode) Close() error { return nil } -func (n *ManyToOneNode) forward(proc *process.Process, index int) { - n.mu.RLock() - defer n.mu.RUnlock() +func (n *ManyToOneNode) forward(index int) port.Listener { + return port.ListenFunc(func(proc *process.Process) { + n.mu.RLock() + defer n.mu.RUnlock() - inReaders := make([]*packet.Reader, len(n.inPorts)) - for i, inPort := range n.inPorts { - inReaders[i] = inPort.Open(proc) - } - outWriter := n.outPort.Open(proc) - errWriter := n.errPort.Open(proc) + inReader := n.inPorts[index].Open(proc) + outWriter := n.outPort.Open(proc) + errWriter := n.errPort.Open(proc) - readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) { - return packet.NewReadGroup(inReaders), nil - }) - - for inPck := range inReaders[index].Read() { - n.tracer.Read(inReaders[index], inPck) - - if inPcks := readGroup.Read(inReaders[index], inPck); len(inPcks) < len(inReaders) { - n.tracer.Reduce(inPck) - } else if outPck, errPck := n.action(proc, inPcks); errPck != nil { - n.tracer.Transform(inPck, errPck) - n.tracer.Write(errWriter, errPck) - } else if outPck != nil { - n.tracer.Transform(inPck, outPck) - n.tracer.Write(outWriter, outPck) - } else { - n.tracer.Reduce(inPck) + readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) { + inReaders := make([]*packet.Reader, len(n.inPorts)) + for i, inPort := range n.inPorts { + inReaders[i] = inPort.Open(proc) + } + return packet.NewReadGroup(inReaders), nil + }) + + for inPck := range inReader.Read() { + n.tracer.Read(inReader, inPck) + + if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) { + n.tracer.Reduce(inPck) + } else if outPck, errPck := n.action(proc, inPcks); errPck != nil { + n.tracer.Transform(inPck, errPck) + n.tracer.Write(errWriter, errPck) + } else if outPck != nil { + n.tracer.Transform(inPck, outPck) + n.tracer.Write(outWriter, outPck) + } else { + n.tracer.Reduce(inPck) + } } - } + }) } func (n *ManyToOneNode) backward(proc *process.Process) { - n.mu.RLock() - defer n.mu.RUnlock() - outWriter := n.outPort.Open(proc) for backPck := range outWriter.Receive() { @@ -137,9 +134,6 @@ func (n *ManyToOneNode) backward(proc *process.Process) { } func (n *ManyToOneNode) catch(proc *process.Process) { - n.mu.RLock() - defer n.mu.RUnlock() - errWriter := n.errPort.Open(proc) for backPck := range errWriter.Receive() { diff --git a/pkg/node/onetomany.go b/pkg/node/onetomany.go index d3c896a1..70adaa44 100644 --- a/pkg/node/onetomany.go +++ b/pkg/node/onetomany.go @@ -66,10 +66,7 @@ func (n *OneToManyNode) Out(name string) *port.OutPort { outPort := port.NewOut() n.outPorts = append(n.outPorts, outPort) if n.action != nil { - i := i - outPort.AddListener(port.ListenFunc(func(proc *process.Process) { - n.backward(proc, i) - })) + outPort.AddListener(n.backward(i)) } } } @@ -131,21 +128,22 @@ func (n *OneToManyNode) forward(proc *process.Process) { } } -func (n *OneToManyNode) backward(proc *process.Process, index int) { - n.mu.RLock() - defer n.mu.RUnlock() +func (n *OneToManyNode) backward(index int) port.Listener { + return port.ListenFunc(func(proc *process.Process) { + n.mu.RLock() + defer n.mu.RUnlock() - outWriter := n.outPorts[index].Open(proc) + outPort := n.outPorts[index] - for backPck := range outWriter.Receive() { - n.tracer.Receive(outWriter, backPck) - } + outWriter := outPort.Open(proc) + + for backPck := range outWriter.Receive() { + n.tracer.Receive(outWriter, backPck) + } + }) } func (n *OneToManyNode) catch(proc *process.Process) { - n.mu.RLock() - defer n.mu.RUnlock() - errWriter := n.errPort.Open(proc) for backPck := range errWriter.Receive() { diff --git a/pkg/packet/writer.go b/pkg/packet/writer.go index da1ffd3d..72024e9e 100644 --- a/pkg/packet/writer.go +++ b/pkg/packet/writer.go @@ -31,14 +31,6 @@ func SendOrFallback(writer *Writer, outPck *Packet, backPck *Packet) *Packet { return <-writer.Receive() } -// Discard discards all packets received by the writer. -func Discard(writer *Writer) { - go func() { - for range writer.Receive() { - } - }() -} - // NewWriter creates a new Writer instance and starts its processing loop. func NewWriter() *Writer { w := &Writer{ diff --git a/pkg/resource/store.go b/pkg/resource/store.go index d8224dce..b608d81f 100644 --- a/pkg/resource/store.go +++ b/pkg/resource/store.go @@ -119,7 +119,7 @@ func (s *store[T]) Watch(ctx context.Context, resources ...T) (Stream, error) { } // Load implements the Store interface, loading resources matching the criteria. -func (s *store[T]) Load(ctx context.Context, resources ...T) ([]T, error) { +func (s *store[T]) Load(_ context.Context, resources ...T) ([]T, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -133,7 +133,7 @@ func (s *store[T]) Load(ctx context.Context, resources ...T) ([]T, error) { } // Store implements the Store interface, storing new resources. -func (s *store[T]) Store(ctx context.Context, resources ...T) (int, error) { +func (s *store[T]) Store(_ context.Context, resources ...T) (int, error) { s.mu.Lock() defer s.mu.Unlock() @@ -162,7 +162,7 @@ func (s *store[T]) Store(ctx context.Context, resources ...T) (int, error) { } // Swap implements the Store interface, swapping existing resources with new ones. -func (s *store[T]) Swap(ctx context.Context, resources ...T) (int, error) { +func (s *store[T]) Swap(_ context.Context, resources ...T) (int, error) { s.mu.Lock() defer s.mu.Unlock() @@ -196,7 +196,7 @@ func (s *store[T]) Swap(ctx context.Context, resources ...T) (int, error) { } // Delete implements the Store interface, deleting resources matching the criteria. -func (s *store[T]) Delete(ctx context.Context, resources ...T) (int, error) { +func (s *store[T]) Delete(_ context.Context, resources ...T) (int, error) { s.mu.Lock() defer s.mu.Unlock()