From 621bee358e12068f62cb4235f39a0c859d82982c Mon Sep 17 00:00:00 2001 From: siyual-park Date: Thu, 14 Nov 2024 16:16:37 +0900 Subject: [PATCH] feat: add wait node --- ext/README.md | 1 + ext/README_kr.md | 1 + ext/docs/wait_node.md | 19 ++++++++ ext/docs/wait_node_kr.md | 19 ++++++++ ext/pkg/control/builder.go | 3 ++ ext/pkg/control/builder_test.go | 2 +- ext/pkg/control/wait.go | 43 +++++++++++++++++ ext/pkg/control/wait_test.go | 84 +++++++++++++++++++++++++++++++++ 8 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 ext/docs/wait_node.md create mode 100644 ext/docs/wait_node_kr.md create mode 100644 ext/pkg/control/wait.go create mode 100644 ext/pkg/control/wait_test.go diff --git a/ext/README.md b/ext/README.md index 5addcad7..778efb63 100644 --- a/ext/README.md +++ b/ext/README.md @@ -20,6 +20,7 @@ Precisely manage data flow. - **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets. - **[Split Node](./docs/split_node.md)**: Splits input packets into multiple parts for processing. - **[Switch Node](./docs/switch_node.md)**: Routes input packets to one of several ports based on conditions. +- **[Wait Node](./docs/wait_node.md)**: Introduces a specified delay in processing to pace workflows or await external conditions. ### **IO** diff --git a/ext/README_kr.md b/ext/README_kr.md index 5db094d2..fa80d803 100644 --- a/ext/README_kr.md +++ b/ext/README_kr.md @@ -20,6 +20,7 @@ - **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다. - **[Split 노드](./docs/split_node_kr.md)**: 입력 패킷을 여러 개로 나누어 처리합니다. - **[Switch 노드](./docs/switch_node_kr.md)**: 입력 패킷을 조건에 따라 여러 포트 중 하나로 분기합니다. +- **[Wait 노드](./docs/wait_node_kr.md)**: 지정된 지연 시간을 추가하여 워크플로우를 조정하거나 외부 조건을 기다립니다. ### **IO** diff --git a/ext/docs/wait_node.md b/ext/docs/wait_node.md new file mode 100644 index 00000000..9e7d4179 --- /dev/null +++ b/ext/docs/wait_node.md @@ -0,0 +1,19 @@ +# Wait Node + +**The Wait Node** introduces a delay in packet processing, allowing for timed pauses in workflows. This node is useful for pacing operations or waiting for external conditions before continuing to process data. + +## Specification + +- **interval**: Defines the duration (in milliseconds or a Go `time.Duration` format) for which the node will delay before passing the input packet to the output. + +## Ports + +- **in**: Receives the input packet and initiates a delay. +- **out**: Outputs the original input packet after the specified delay. + +## Example + +```yaml +- kind: wait + interval: 2000 # Delay of 2 seconds +``` diff --git a/ext/docs/wait_node_kr.md b/ext/docs/wait_node_kr.md new file mode 100644 index 00000000..9e2167c3 --- /dev/null +++ b/ext/docs/wait_node_kr.md @@ -0,0 +1,19 @@ +# Wait 노드 + +**Wait 노드**는 패킷 처리에 지연을 추가하여 워크플로우에서 일정한 대기 시간을 설정할 수 있습니다. 이 노드는 작업의 속도를 조절하거나 외부 조건을 기다려야 할 때 유용합니다. + +## 명세 + +- **interval**: 입력 패킷을 출력하기 전까지 지연할 시간을 정의합니다. 밀리초 단위 또는 Go의 `time.Duration` 형식으로 설정할 수 있습니다. + +## 포트 + +- **in**: 입력 패킷을 받아 지연을 시작합니다. +- **out**: 지정된 지연 시간 후에 원래의 입력 패킷을 출력합니다. + +## 예시 + +```yaml +- kind: wait + interval: 2000 # 2초 지연 +``` diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index 261dc35b..ec20df59 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -76,6 +76,9 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { s.AddKnownType(KindSwitch, &SwitchNodeSpec{}) s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr)) + s.AddKnownType(KindWait, &WaitNodeSpec{}) + s.AddCodec(KindWait, NewWaitNodeCodec()) + return nil }) } diff --git a/ext/pkg/control/builder_test.go b/ext/pkg/control/builder_test.go index 26b10d01..26f2f571 100644 --- a/ext/pkg/control/builder_test.go +++ b/ext/pkg/control/builder_test.go @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) { err := AddToScheme(m, text.Language).AddToScheme(s) assert.NoError(t, err) - tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch} + tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait} for _, tt := range tests { t.Run(tt, func(t *testing.T) { diff --git a/ext/pkg/control/wait.go b/ext/pkg/control/wait.go new file mode 100644 index 00000000..8449ee4b --- /dev/null +++ b/ext/pkg/control/wait.go @@ -0,0 +1,43 @@ +package control + +import ( + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" + "time" +) + +// WaitNodeSpec defines the configuration for WaitNode, including a delay duration. +type WaitNodeSpec struct { + spec.Meta `map:",inline"` + Interval time.Duration `map:"timeout"` +} + +// WaitNode adds a delay to packet processing, using a specified interval. +type WaitNode struct { + *node.OneToOneNode + interval time.Duration +} + +const KindWait = "wait" + +// NewWaitNodeCodec creates a codec to build WaitNode from WaitNodeSpec. +func NewWaitNodeCodec() scheme.Codec { + return scheme.CodecWithType(func(spec *WaitNodeSpec) (node.Node, error) { + return NewWaitNode(spec.Interval), nil + }) +} + +// NewWaitNode creates a WaitNode with the given delay interval. +func NewWaitNode(interval time.Duration) *WaitNode { + n := &WaitNode{interval: interval} + n.OneToOneNode = node.NewOneToOneNode(n.action) + return n +} + +func (n *WaitNode) action(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { + time.Sleep(n.interval) + return inPck, nil +} diff --git a/ext/pkg/control/wait_test.go b/ext/pkg/control/wait_test.go new file mode 100644 index 00000000..80ba550f --- /dev/null +++ b/ext/pkg/control/wait_test.go @@ -0,0 +1,84 @@ +package control + +import ( + "context" + "github.com/go-faker/faker/v4" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/types" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestWaitNodeCodec_Decode(t *testing.T) { + codec := NewWaitNodeCodec() + + spec := &WaitNodeSpec{ + Interval: 0, + } + + n, err := codec.Compile(spec) + assert.NoError(t, err) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestNewWaitNode(t *testing.T) { + n := NewWaitNode(0) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestWaitNode_SendAndReceive(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + n := NewWaitNode(0) + defer n.Close() + + in := port.NewOut() + in.Link(n.In(node.PortIn)) + + proc := process.New() + defer proc.Exit(nil) + + inWriter := in.Open(proc) + + inPayload := types.NewString(faker.Word()) + inPck := packet.New(inPayload) + + inWriter.Write(inPck) + + select { + case outPck := <-inWriter.Receive(): + assert.Equal(t, inPayload, outPck.Payload()) + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } +} + +func BenchmarkWaitNode_SendAndReceive(b *testing.B) { + n := NewWaitNode(0) + defer n.Close() + + in := port.NewOut() + in.Link(n.In(node.PortIn)) + + proc := process.New() + defer proc.Exit(nil) + + inWriter := in.Open(proc) + + var inPayload types.Value + inPck := packet.New(inPayload) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + inWriter.Write(inPck) + <-inWriter.Receive() + } +}