Skip to content

Commit

Permalink
feat: support cache node
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 29, 2024
1 parent 0c8b6c3 commit a1a5ea2
Show file tree
Hide file tree
Showing 15 changed files with 527 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/pkg/cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type GetConfig struct {
func NewGetCommand(config GetConfig) *cobra.Command {
cmd := &cobra.Command{
Use: "get",
Short: "Get resources from the specified namespace",
Short: "Load resources from the specified namespace",
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
ValidArgs: []string{specs, secrets, charts},
RunE: runs(map[string]func(cmd *cobra.Command) error{
Expand Down
1 change: 1 addition & 0 deletions ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Built-in extensions enable efficient processing of various tasks and maximize sy
Precisely manage data flow.

- **[Block Node](./docs/block_node.md)**: Combines multiple sub-nodes to manage complex data processing flows.
- **[Cache Node](./docs/cache_node.md)**: Uses an LRU (Least Recently Used) cache to store and retrieve data.
- **[Fork Node](./docs/fork_node.md)**: Asynchronously branches data flows to perform independent tasks in parallel.
- **[If Node](./docs/if_node.md)**: Evaluates conditions to split packets into two paths.
- **[Loop Node](./docs/loop_node.md)**: Divides input packets into multiple sub-packets for repeated processing.
Expand Down
1 change: 1 addition & 0 deletions ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
데이터 흐름을 정밀하게 제어합니다.

- **[Block 노드](./docs/block_node_kr.md)**: 여러 하위 노드를 묶어 복잡한 데이터 처리 흐름을 관리하며, 각 하위 노드는 특정 작업을 수행합니다.
- **[Cache 노드](./docs/cache_node_kr.md)**: LRU(Least Recently Used) 캐시를 사용하여 데이터를 저장하고 조회합니다.
- **[Fork 노드](./docs/fork_node_kr.md)**: 데이터 흐름을 비동기적으로 분기하여 독립적인 작업을 병렬로 수행합니다.
- **[If 노드](./docs/if_node_kr.md)**: 조건을 평가하여 패킷을 두 경로로 분기합니다.
- **[Loop 노드](./docs/loop_node_kr.md)**: 입력 패킷을 여러 하위 패킷으로 나누어 반복 처리합니다.
Expand Down
26 changes: 26 additions & 0 deletions ext/docs/cache_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Cache Node

**The Cache Node** implements a caching mechanism using an LRU (Least Recently Used) strategy to store and retrieve
data. This node provides caching capabilities to store results temporarily and reuses them for future requests,
improving performance by reducing the need for repeated processing.

## Specification

- **capacity**: Defines the maximum number of items the cache can hold. When the cache exceeds this capacity, the least
recently used entries are evicted.
- **ttl**: Specifies the time-to-live (TTL) for cache entries. Once an entry expires, it will be removed from the cache.
If not set, the cache does not have a TTL.

## Ports

- **in**: Receives the input packet and performs a cache lookup. If the data is found, it is returned. If not, the data
is processed and added to the cache.
- **out**: Outputs the result of the cache lookup or processed data.

## Example

```yaml
- kind: cache
capacity: 100 # Cache capacity of 100 items
ttl: 1h # Entries will expire after 1 hour
```
22 changes: 22 additions & 0 deletions ext/docs/cache_node_kr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Cache Node

**Cache 노드**는 LRU(Least Recently Used) 전략을 사용하여 데이터를 캐싱하고 저장 및 조회 기능을 제공합니다. 이 노드는 처리 결과를 일시적으로 저장하고, 이후 요청 시 이미 처리된
데이터를 재사용하여 성능을 향상시키는 역할을 합니다.

## 명세

- **capacity**: 캐시가 보유할 수 있는 최대 항목 수를 정의합니다. 캐시 용량을 초과하면 가장 최근에 사용되지 않은 항목이 삭제됩니다.
- **ttl**: 캐시 항목의 유효 기간(Time-to-Live)을 지정합니다. 설정된 시간이 지나면 항목은 캐시에서 자동으로 삭제됩니다. 설정하지 않으면 TTL이 적용되지 않습니다.

## 포트

- **in**: 입력 패킷을 받아 캐시 조회를 수행합니다. 데이터가 캐시에서 발견되면 반환하고, 그렇지 않으면 데이터를 처리하여 캐시에 추가합니다.
- **out**: 캐시 조회 결과 또는 처리된 데이터를 출력합니다.

## 예시

```yaml
- kind: cache
capacity: 100 # 캐시 용량 100 항목
ttl: 1h # 항목은 1시간 후 만료
```
1 change: 0 additions & 1 deletion ext/docs/retry_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

- **in**: Receives the input packet and initiates processing. The packet will be retried until the `limit` is reached if processing fails.
- **out**: Outputs the packet if processing is successful within the retry limit.
- **error**: Routes the packet to the error output if it exceeds the retry limit without success.

## Example

Expand Down
1 change: 0 additions & 1 deletion ext/docs/retry_node_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

- **in**: 입력 패킷을 받아 처리를 시작합니다. 처리 실패 시 `limit`에 도달할 때까지 재시도를 시도합니다.
- **out**: 재시도 횟수 내에 처리에 성공하면 원래의 입력 패킷을 출력합니다.
- **error**: 재시도 횟수를 초과하여 처리에 실패한 패킷을 출력합니다.

## 예시

Expand Down
1 change: 1 addition & 0 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
spec spec.Spec
}{
{KindBlock, NewBlockNodeCodec(s), &BlockNodeSpec{}},
{KindCache, NewCacheNodeCodec(), &CacheNodeSpec{}},
{KindPipe, NewPipeNodeCodec(), &PipeNodeSpec{}},
{KindFork, NewForkNodeCodec(), &ForkNodeSpec{}},
{KindIf, NewIfNodeCodec(expr), &IfNodeSpec{}},
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindBlock, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}
tests := []string{KindBlock, KindCache, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
121 changes: 121 additions & 0 deletions ext/pkg/control/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package control

import (
"time"

"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/types"
)

// CacheNodeSpec represents the specification for a cache node.
type CacheNodeSpec struct {
spec.Meta `map:",inline"`
Capacity int `map:"capacity,omitempty"`
TTL time.Duration `map:"ttl,omitempty"`
}

// CacheNode represents a node in the cache.
type CacheNode struct {
lru *types.LRU
tracer *packet.Tracer
inPort *port.InPort
outPort *port.OutPort
}

const KindCache = "cache"

var _ node.Node = (*CacheNode)(nil)

// NewCacheNodeCodec creates a new codec for CacheNodeSpec.
func NewCacheNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(spec *CacheNodeSpec) (node.Node, error) {
return NewCacheNode(spec.Capacity, spec.TTL), nil
})
}

// NewCacheNode creates a new CacheNode with the given capacity and TTL.
func NewCacheNode(capacity int, ttl time.Duration) *CacheNode {
n := &CacheNode{
lru: types.NewLRU(capacity, ttl),
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPort: port.NewOut(),
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPort.AddListener(port.ListenFunc(n.backward))

return n
}

// In returns the input port for the given name.
func (n *CacheNode) In(name string) *port.InPort {
switch name {
case node.PortIn:
return n.inPort
default:
return nil
}
}

// Out returns the output port for the given name.
func (n *CacheNode) Out(name string) *port.OutPort {
switch name {
case node.PortOut:
return n.outPort
default:
return nil
}
}

// Close closes the CacheNode and its ports.
func (n *CacheNode) Close() error {
n.inPort.Close()
n.outPort.Close()
n.tracer.Close()
n.lru.Clear()
return nil
}

func (n *CacheNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
var outWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

inPayload := inPck.Payload()
if outPayload, ok := n.lru.Load(inPayload); ok {
outPck := packet.New(outPayload)
n.tracer.Transform(inPck, outPck)
n.tracer.Reduce(outPck)
} else {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) {
if _, ok := backPck.Payload().(types.Error); !ok {
n.lru.Store(inPayload, backPck.Payload())
}
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
}))

n.tracer.Write(outWriter, inPck)
}
}
}

func (n *CacheNode) backward(proc *process.Process) {
outWriter := n.outPort.Open(proc)

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
}
78 changes: 78 additions & 0 deletions ext/pkg/control/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package control

import (
"context"
"testing"
"time"

"github.com/go-faker/faker/v4"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/types"
"github.com/stretchr/testify/assert"
)

func TestCacheNodeCodec_Compile(t *testing.T) {
codec := NewCacheNodeCodec()

spec := &CacheNodeSpec{
Capacity: 1,
TTL: time.Second,
}

n, err := codec.Compile(spec)
assert.NoError(t, err)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestNewCacheNode(t *testing.T) {
n := NewCacheNode(0, 0)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestOneToOneNode_Port(t *testing.T) {
n := NewCacheNode(0, 0)
defer n.Close()

assert.NotNil(t, n.In(node.PortIn))
assert.NotNil(t, n.Out(node.PortOut))
}

func TestCacheNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

n := NewCacheNode(0, 0)
defer n.Close()

in := port.NewOut()
in.Link(n.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

inPayload := types.NewString(faker.UUIDHyphenated())
inPck := packet.New(inPayload)

inWriter.Write(inPck)

select {
case <-inWriter.Receive():
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}

inWriter.Write(inPck)

select {
case <-inWriter.Receive():
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
}
27 changes: 4 additions & 23 deletions ext/pkg/control/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type RetryNode struct {
tracer *packet.Tracer
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
}

var _ node.Node = (*RetryNode)(nil)
Expand All @@ -45,12 +44,10 @@ func NewRetryNode(threshold int) *RetryNode {
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPort: port.NewOut(),
errPort: port.NewOut(),
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPort.AddListener(port.ListenFunc(n.backward))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
}
Expand All @@ -68,8 +65,6 @@ func (n *RetryNode) Out(name string) *port.OutPort {
switch name {
case node.PortOut:
return n.outPort
case node.PortError:
return n.errPort
default:
return nil
}
Expand All @@ -79,15 +74,13 @@ func (n *RetryNode) Out(name string) *port.OutPort {
func (n *RetryNode) Close() error {
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()
n.tracer.Close()
return nil
}

func (n *RetryNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)

attempts := &sync.Map{}

Expand All @@ -96,19 +89,15 @@ func (n *RetryNode) forward(proc *process.Process) {

var hook packet.Hook
hook = packet.HookFunc(func(backPck *packet.Packet) {
if _, ok := backPck.Payload().(types.Error); !ok {
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
return
}

for {
actual, _ := attempts.LoadOrStore(inPck, 0)
count := actual.(int)

if count == n.threshold {
_, fail := backPck.Payload().(types.Error)
if !fail || count == n.threshold {
attempts.Delete(inPck)
n.tracer.Transform(inPck, backPck)
n.tracer.Write(errWriter, backPck)
n.tracer.Reduce(backPck)
return
}

Expand All @@ -133,11 +122,3 @@ func (n *RetryNode) backward(proc *process.Process) {
n.tracer.Receive(outWriter, backPck)
}
}

func (n *RetryNode) catch(proc *process.Process) {
errWriter := n.errPort.Open(proc)

for backPck := range errWriter.Receive() {
n.tracer.Receive(errWriter, backPck)
}
}
1 change: 0 additions & 1 deletion ext/pkg/control/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestRetryNode_Port(t *testing.T) {

assert.NotNil(t, n.In(node.PortIn))
assert.NotNil(t, n.Out(node.PortOut))
assert.NotNil(t, n.Out(node.PortError))
}

func TestRetryNode_SendAndReceive(t *testing.T) {
Expand Down
Loading

0 comments on commit a1a5ea2

Please sign in to comment.