Skip to content

Commit

Permalink
feat: add wait node
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 14, 2024
1 parent 3ea3c94 commit 621bee3
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 1 deletion.
1 change: 1 addition & 0 deletions ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Precisely manage data flow.
- **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets.
- **[Split Node](./docs/split_node.md)**: Splits input packets into multiple parts for processing.
- **[Switch Node](./docs/switch_node.md)**: Routes input packets to one of several ports based on conditions.
- **[Wait Node](./docs/wait_node.md)**: Introduces a specified delay in processing to pace workflows or await external conditions.

### **IO**

Expand Down
1 change: 1 addition & 0 deletions ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다.
- **[Split 노드](./docs/split_node_kr.md)**: 입력 패킷을 여러 개로 나누어 처리합니다.
- **[Switch 노드](./docs/switch_node_kr.md)**: 입력 패킷을 조건에 따라 여러 포트 중 하나로 분기합니다.
- **[Wait 노드](./docs/wait_node_kr.md)**: 지정된 지연 시간을 추가하여 워크플로우를 조정하거나 외부 조건을 기다립니다.

### **IO**

Expand Down
19 changes: 19 additions & 0 deletions ext/docs/wait_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Wait Node

**The Wait Node** introduces a delay in packet processing, allowing for timed pauses in workflows. This node is useful for pacing operations or waiting for external conditions before continuing to process data.

## Specification

- **interval**: Defines the duration (in milliseconds or a Go `time.Duration` format) for which the node will delay before passing the input packet to the output.

## Ports

- **in**: Receives the input packet and initiates a delay.
- **out**: Outputs the original input packet after the specified delay.

## Example

```yaml
- kind: wait
interval: 2000 # Delay of 2 seconds
```
19 changes: 19 additions & 0 deletions ext/docs/wait_node_kr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Wait 노드

**Wait 노드**는 패킷 처리에 지연을 추가하여 워크플로우에서 일정한 대기 시간을 설정할 수 있습니다. 이 노드는 작업의 속도를 조절하거나 외부 조건을 기다려야 할 때 유용합니다.

## 명세

- **interval**: 입력 패킷을 출력하기 전까지 지연할 시간을 정의합니다. 밀리초 단위 또는 Go의 `time.Duration` 형식으로 설정할 수 있습니다.

## 포트

- **in**: 입력 패킷을 받아 지연을 시작합니다.
- **out**: 지정된 지연 시간 후에 원래의 입력 패킷을 출력합니다.

## 예시

```yaml
- kind: wait
interval: 2000 # 2초 지연
```
3 changes: 3 additions & 0 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
s.AddKnownType(KindSwitch, &SwitchNodeSpec{})
s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr))

s.AddKnownType(KindWait, &WaitNodeSpec{})
s.AddCodec(KindWait, NewWaitNodeCodec())

return nil
})
}
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch}
tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
43 changes: 43 additions & 0 deletions ext/pkg/control/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package control

import (
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"time"
)

// WaitNodeSpec defines the configuration for WaitNode, including a delay duration.
type WaitNodeSpec struct {
spec.Meta `map:",inline"`
Interval time.Duration `map:"timeout"`
}

// WaitNode adds a delay to packet processing, using a specified interval.
type WaitNode struct {
*node.OneToOneNode
interval time.Duration
}

const KindWait = "wait"

// NewWaitNodeCodec creates a codec to build WaitNode from WaitNodeSpec.
func NewWaitNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(spec *WaitNodeSpec) (node.Node, error) {
return NewWaitNode(spec.Interval), nil
})
}

// NewWaitNode creates a WaitNode with the given delay interval.
func NewWaitNode(interval time.Duration) *WaitNode {
n := &WaitNode{interval: interval}
n.OneToOneNode = node.NewOneToOneNode(n.action)
return n
}

func (n *WaitNode) action(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) {
time.Sleep(n.interval)
return inPck, nil
}
84 changes: 84 additions & 0 deletions ext/pkg/control/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package control

import (
"context"
"github.com/go-faker/faker/v4"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/types"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestWaitNodeCodec_Decode(t *testing.T) {
codec := NewWaitNodeCodec()

spec := &WaitNodeSpec{
Interval: 0,
}

n, err := codec.Compile(spec)
assert.NoError(t, err)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestNewWaitNode(t *testing.T) {
n := NewWaitNode(0)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestWaitNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

n := NewWaitNode(0)
defer n.Close()

in := port.NewOut()
in.Link(n.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

inPayload := types.NewString(faker.Word())
inPck := packet.New(inPayload)

inWriter.Write(inPck)

select {
case outPck := <-inWriter.Receive():
assert.Equal(t, inPayload, outPck.Payload())
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
}

func BenchmarkWaitNode_SendAndReceive(b *testing.B) {
n := NewWaitNode(0)
defer n.Close()

in := port.NewOut()
in.Link(n.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

var inPayload types.Value
inPck := packet.New(inPayload)

b.ResetTimer()

for i := 0; i < b.N; i++ {
inWriter.Write(inPck)
<-inWriter.Receive()
}
}

0 comments on commit 621bee3

Please sign in to comment.