Skip to content

Commit

Permalink
feat: support try node
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 29, 2024
1 parent d9e5365 commit 6900982
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Precisely manage data flow.
- **[Step Node](./docs/step_node.md)**: Systematically manages complex data processing flows and executes multiple
sub-nodes steply.
- **[Switch Node](./docs/switch_node.md)**: Routes input packets to one of several ports based on conditions.
- **[Try Node](./docs/try_node.md)**: Handles errors that may occur during packet processing and appropriately manages
them through the error port.
- **[Wait Node](./docs/wait_node.md)**: Introduces a specified delay in processing to pace workflows or await external conditions.

### **IO**
Expand Down
1 change: 1 addition & 0 deletions ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- **[Split 노드](./docs/split_node_kr.md)**: 입력 패킷을 여러 개로 나누어 처리합니다.
- **[Step 노드](./docs/step_node_kr.md)**: 복잡한 데이터 처리 흐름을 체계적으로 관리하며, 여러 하위 노드를 순차적으로 실행합니다.
- **[Switch 노드](./docs/switch_node_kr.md)**: 입력 패킷을 조건에 따라 여러 포트 중 하나로 분기합니다.
- **[Try 노드](./docs/try_node_kr.md)**: 패킷 처리 중 발생할 수 있는 오류를 오류 포트를 통해 적절히 처리합니다.
- **[Wait 노드](./docs/wait_node_kr.md)**: 지정된 지연 시간을 추가하여 워크플로우를 조정하거나 외부 조건을 기다립니다.

### **IO**
Expand Down
2 changes: 1 addition & 1 deletion ext/docs/cache_node_kr.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Cache Node

**Cache 노드**는 LRU(Least Recently Used) 전략을 사용하여 데이터를 캐싱하고 저장 및 조회 기능을 제공합니다. 이 노드는 처리 결과를 일시적으로 저장하고, 이후 요청 시 이미 처리된
**Cache Node**는 LRU(Least Recently Used) 전략을 사용하여 데이터를 캐싱하고 저장 및 조회 기능을 제공합니다. 이 노드는 처리 결과를 일시적으로 저장하고, 이후 요청 시 이미 처리된
데이터를 재사용하여 성능을 향상시키는 역할을 합니다.

## 명세
Expand Down
27 changes: 27 additions & 0 deletions ext/docs/try_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
### **Try Node**

The **Try Node** handles errors that may occur during packet processing. When a packet is processed and an error occurs,
it directs the error to the error output port for appropriate handling.

## **Specification**

- **None**: The Try Node operates by default without any additional configuration.

## **Ports**

- **in**: Receives and processes incoming packets.
- **out**: Outputs the original input packet if processed without error.
- **error**: Outputs any errors that occur during packet processing.

## **Example**

```yaml
- kind: try
ports:
out:
- name: next
port: in
error:
- name: catch
port: in
```
26 changes: 26 additions & 0 deletions ext/docs/try_node_kr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# **Try Node**

**Try Node**는 패킷 처리 중 발생할 수 있는 오류를 처리하는 노드입니다. 이 노드는 패킷을 처리하면서 오류가 발생하면, 이를 오류 출력 포트로 전달하여 적절한 처리를 할 수 있도록 합니다.

## **명세**

- **없음**: Try 노드는 별도의 추가 설정 없이 기본적으로 동작합니다.

## **포트**

- **in**: 패킷을 수신하여 처리합니다.
- **out**: 원래의 입력 패킷을 그대로 출력합니다.
- **error**: 패킷 처리 중 오류가 발생하면 해당 오류를 이 포트를 통해 출력합니다.

## **예시**

```yaml
- kind: try
ports:
out:
- name: next
port: in
error:
- name: catch
port: in
```
1 change: 1 addition & 0 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
{KindSplit, NewSplitNodeCodec(), &SplitNodeSpec{}},
{KindStep, NewStepNodeCodec(s), &StepNodeSpec{}},
{KindSwitch, NewSwitchNodeCodec(expr), &SwitchNodeSpec{}},
{KindTry, NewTryNodeCodec(), &TryNodeSpec{}},
{KindWait, NewWaitNodeCodec(), &WaitNodeSpec{}},
}

Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindBlock, KindCache, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}
tests := []string{KindBlock, KindCache, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindTry, KindWait}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
45 changes: 41 additions & 4 deletions ext/pkg/control/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNewCacheNode(t *testing.T) {
assert.NoError(t, n.Close())
}

func TestOneToOneNode_Port(t *testing.T) {
func TestCacheNode_Port(t *testing.T) {
n := NewCacheNode(0, 0)
defer n.Close()

Expand All @@ -46,11 +46,18 @@ func TestCacheNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

n := NewCacheNode(0, 0)
defer n.Close()
n1 := NewCacheNode(0, 0)
defer n1.Close()

n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) {
return inPck, nil
})
defer n2.Close()

n1.Out(node.PortOut).Link(n2.In(node.PortIn))

in := port.NewOut()
in.Link(n.In(node.PortIn))
in.Link(n1.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)
Expand All @@ -76,3 +83,33 @@ func TestCacheNode_SendAndReceive(t *testing.T) {
assert.Fail(t, ctx.Err().Error())
}
}

func BenchmarkCacheNode_SendAndReceive(b *testing.B) {
n1 := NewCacheNode(0, 0)
defer n1.Close()

n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) {
return inPck, nil
})
defer n2.Close()

n1.Out(node.PortOut).Link(n2.In(node.PortIn))

in := port.NewOut()
in.Link(n1.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

inPayload := types.NewString(faker.UUIDHyphenated())
inPck := packet.New(inPayload)

b.ResetTimer()

for i := 0; i < b.N; i++ {
inWriter.Write(inPck)
<-inWriter.Receive()
}
}
2 changes: 1 addition & 1 deletion ext/pkg/control/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const KindSession = "session"

var _ node.Node = (*SessionNode)(nil)

// NewSessionNodeCodec creates a codec for decoding NewSessionNodeCodec.
// NewSessionNodeCodec creates a codec for decoding SessionNodeSpec.
func NewSessionNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(_ *SessionNodeSpec) (node.Node, error) {
return NewSessionNode(), nil
Expand Down
126 changes: 126 additions & 0 deletions ext/pkg/control/try.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package control

import (
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/types"
)

// TryNodeSpec defines the specification for creating a TryNode.
type TryNodeSpec struct {
spec.Meta `map:",inline"`
}

// TryNode represents a node that processes packets and handles errors.
type TryNode struct {
tracer *packet.Tracer
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
}

const KindTry = "try"

var _ node.Node = (*TryNode)(nil)

// NewTryNodeCodec creates a codec for decoding TryNodeSpec.
func NewTryNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(_ *TryNodeSpec) (node.Node, error) {
return NewTryNode(), nil
})
}

// NewTryNode creates a new TryNode.
func NewTryNode() *TryNode {
n := &TryNode{
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPort: port.NewOut(),
errPort: port.NewOut(),
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPort.AddListener(port.ListenFunc(n.backward))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
}

// In returns the input port for the given name.
func (n *TryNode) In(name string) *port.InPort {
switch name {
case node.PortIn:
return n.inPort
default:
return nil
}
}

// Out returns the output port for the given name.
func (n *TryNode) Out(name string) *port.OutPort {
switch name {
case node.PortOut:
return n.outPort
case node.PortError:
return n.errPort
default:
return nil
}
}

// Close closes the TryNode and its ports.
func (n *TryNode) Close() error {
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()
n.tracer.Close()
return nil
}

func (n *TryNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
n.tracer.Transform(inPck, backPck)
if _, ok := backPck.Payload().(types.Error); ok {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
n.tracer.Write(errWriter, backPck)
} else {
n.tracer.Reduce(backPck)
}
}))

n.tracer.Write(outWriter, inPck)
}
}

func (n *TryNode) backward(proc *process.Process) {
outWriter := n.outPort.Open(proc)

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
}

func (n *TryNode) catch(proc *process.Process) {
errWriter := n.errPort.Open(proc)

for backPck := range errWriter.Receive() {
n.tracer.Receive(errWriter, backPck)
}
}
Loading

0 comments on commit 6900982

Please sign in to comment.