diff --git a/cmd/pkg/cli/get.go b/cmd/pkg/cli/get.go index c4de251a..e884ae3a 100644 --- a/cmd/pkg/cli/get.go +++ b/cmd/pkg/cli/get.go @@ -20,7 +20,7 @@ type GetConfig struct { func NewGetCommand(config GetConfig) *cobra.Command { cmd := &cobra.Command{ Use: "get", - Short: "Get resources from the specified namespace", + Short: "Load resources from the specified namespace", Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs), ValidArgs: []string{specs, secrets, charts}, RunE: runs(map[string]func(cmd *cobra.Command) error{ diff --git a/ext/README.md b/ext/README.md index 533ebe9a..b9e7fd6c 100644 --- a/ext/README.md +++ b/ext/README.md @@ -9,6 +9,7 @@ Built-in extensions enable efficient processing of various tasks and maximize sy Precisely manage data flow. - **[Block Node](./docs/block_node.md)**: Combines multiple sub-nodes to manage complex data processing flows. +- **[Cache Node](./docs/cache_node.md)**: Uses an LRU (Least Recently Used) cache to store and retrieve data. - **[Fork Node](./docs/fork_node.md)**: Asynchronously branches data flows to perform independent tasks in parallel. - **[If Node](./docs/if_node.md)**: Evaluates conditions to split packets into two paths. - **[Loop Node](./docs/loop_node.md)**: Divides input packets into multiple sub-packets for repeated processing. diff --git a/ext/README_kr.md b/ext/README_kr.md index 2b2b11c1..4434ca19 100644 --- a/ext/README_kr.md +++ b/ext/README_kr.md @@ -9,6 +9,7 @@ 데이터 흐름을 정밀하게 제어합니다. - **[Block 노드](./docs/block_node_kr.md)**: 여러 하위 노드를 묶어 복잡한 데이터 처리 흐름을 관리하며, 각 하위 노드는 특정 작업을 수행합니다. +- **[Cache 노드](./docs/cache_node_kr.md)**: LRU(Least Recently Used) 캐시를 사용하여 데이터를 저장하고 조회합니다. - **[Fork 노드](./docs/fork_node_kr.md)**: 데이터 흐름을 비동기적으로 분기하여 독립적인 작업을 병렬로 수행합니다. - **[If 노드](./docs/if_node_kr.md)**: 조건을 평가하여 패킷을 두 경로로 분기합니다. - **[Loop 노드](./docs/loop_node_kr.md)**: 입력 패킷을 여러 하위 패킷으로 나누어 반복 처리합니다. diff --git a/ext/docs/cache_node.md b/ext/docs/cache_node.md new file mode 100644 index 00000000..3811f6b6 --- /dev/null +++ b/ext/docs/cache_node.md @@ -0,0 +1,26 @@ +# Cache Node + +**The Cache Node** implements a caching mechanism using an LRU (Least Recently Used) strategy to store and retrieve +data. This node provides caching capabilities to store results temporarily and reuses them for future requests, +improving performance by reducing the need for repeated processing. + +## Specification + +- **capacity**: Defines the maximum number of items the cache can hold. When the cache exceeds this capacity, the least + recently used entries are evicted. +- **ttl**: Specifies the time-to-live (TTL) for cache entries. Once an entry expires, it will be removed from the cache. + If not set, the cache does not have a TTL. + +## Ports + +- **in**: Receives the input packet and performs a cache lookup. If the data is found, it is returned. If not, the data + is processed and added to the cache. +- **out**: Outputs the result of the cache lookup or processed data. + +## Example + +```yaml +- kind: cache + capacity: 100 # Cache capacity of 100 items + ttl: 1h # Entries will expire after 1 hour +``` diff --git a/ext/docs/cache_node_kr.md b/ext/docs/cache_node_kr.md new file mode 100644 index 00000000..b284503b --- /dev/null +++ b/ext/docs/cache_node_kr.md @@ -0,0 +1,22 @@ +# Cache Node + +**Cache 노드**는 LRU(Least Recently Used) 전략을 사용하여 데이터를 캐싱하고 저장 및 조회 기능을 제공합니다. 이 노드는 처리 결과를 일시적으로 저장하고, 이후 요청 시 이미 처리된 +데이터를 재사용하여 성능을 향상시키는 역할을 합니다. + +## 명세 + +- **capacity**: 캐시가 보유할 수 있는 최대 항목 수를 정의합니다. 캐시 용량을 초과하면 가장 최근에 사용되지 않은 항목이 삭제됩니다. +- **ttl**: 캐시 항목의 유효 기간(Time-to-Live)을 지정합니다. 설정된 시간이 지나면 항목은 캐시에서 자동으로 삭제됩니다. 설정하지 않으면 TTL이 적용되지 않습니다. + +## 포트 + +- **in**: 입력 패킷을 받아 캐시 조회를 수행합니다. 데이터가 캐시에서 발견되면 반환하고, 그렇지 않으면 데이터를 처리하여 캐시에 추가합니다. +- **out**: 캐시 조회 결과 또는 처리된 데이터를 출력합니다. + +## 예시 + +```yaml +- kind: cache + capacity: 100 # 캐시 용량 100 항목 + ttl: 1h # 항목은 1시간 후 만료 +``` diff --git a/ext/docs/retry_node.md b/ext/docs/retry_node.md index f9bfe8d5..6c9265ae 100644 --- a/ext/docs/retry_node.md +++ b/ext/docs/retry_node.md @@ -10,7 +10,6 @@ - **in**: Receives the input packet and initiates processing. The packet will be retried until the `limit` is reached if processing fails. - **out**: Outputs the packet if processing is successful within the retry limit. -- **error**: Routes the packet to the error output if it exceeds the retry limit without success. ## Example diff --git a/ext/docs/retry_node_kr.md b/ext/docs/retry_node_kr.md index 440a6feb..476567e5 100644 --- a/ext/docs/retry_node_kr.md +++ b/ext/docs/retry_node_kr.md @@ -10,7 +10,6 @@ - **in**: 입력 패킷을 받아 처리를 시작합니다. 처리 실패 시 `limit`에 도달할 때까지 재시도를 시도합니다. - **out**: 재시도 횟수 내에 처리에 성공하면 원래의 입력 패킷을 출력합니다. -- **error**: 재시도 횟수를 초과하여 처리에 실패한 패킷을 출력합니다. ## 예시 diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index a405c1fa..6948d22b 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -20,6 +20,7 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { spec spec.Spec }{ {KindBlock, NewBlockNodeCodec(s), &BlockNodeSpec{}}, + {KindCache, NewCacheNodeCodec(), &CacheNodeSpec{}}, {KindPipe, NewPipeNodeCodec(), &PipeNodeSpec{}}, {KindFork, NewForkNodeCodec(), &ForkNodeSpec{}}, {KindIf, NewIfNodeCodec(expr), &IfNodeSpec{}}, diff --git a/ext/pkg/control/builder_test.go b/ext/pkg/control/builder_test.go index 35ca4076..114e8744 100644 --- a/ext/pkg/control/builder_test.go +++ b/ext/pkg/control/builder_test.go @@ -18,7 +18,7 @@ func TestAddToScheme(t *testing.T) { err := AddToScheme(m, text.Language).AddToScheme(s) assert.NoError(t, err) - tests := []string{KindBlock, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait} + tests := []string{KindBlock, KindCache, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait} for _, tt := range tests { t.Run(tt, func(t *testing.T) { diff --git a/ext/pkg/control/cache.go b/ext/pkg/control/cache.go new file mode 100644 index 00000000..c4b8a318 --- /dev/null +++ b/ext/pkg/control/cache.go @@ -0,0 +1,121 @@ +package control + +import ( + "time" + + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" + "github.com/siyul-park/uniflow/pkg/types" +) + +// CacheNodeSpec represents the specification for a cache node. +type CacheNodeSpec struct { + spec.Meta `map:",inline"` + Capacity int `map:"capacity,omitempty"` + TTL time.Duration `map:"ttl,omitempty"` +} + +// CacheNode represents a node in the cache. +type CacheNode struct { + lru *types.LRU + tracer *packet.Tracer + inPort *port.InPort + outPort *port.OutPort +} + +const KindCache = "cache" + +var _ node.Node = (*CacheNode)(nil) + +// NewCacheNodeCodec creates a new codec for CacheNodeSpec. +func NewCacheNodeCodec() scheme.Codec { + return scheme.CodecWithType(func(spec *CacheNodeSpec) (node.Node, error) { + return NewCacheNode(spec.Capacity, spec.TTL), nil + }) +} + +// NewCacheNode creates a new CacheNode with the given capacity and TTL. +func NewCacheNode(capacity int, ttl time.Duration) *CacheNode { + n := &CacheNode{ + lru: types.NewLRU(capacity, ttl), + tracer: packet.NewTracer(), + inPort: port.NewIn(), + outPort: port.NewOut(), + } + + n.inPort.AddListener(port.ListenFunc(n.forward)) + n.outPort.AddListener(port.ListenFunc(n.backward)) + + return n +} + +// In returns the input port for the given name. +func (n *CacheNode) In(name string) *port.InPort { + switch name { + case node.PortIn: + return n.inPort + default: + return nil + } +} + +// Out returns the output port for the given name. +func (n *CacheNode) Out(name string) *port.OutPort { + switch name { + case node.PortOut: + return n.outPort + default: + return nil + } +} + +// Close closes the CacheNode and its ports. +func (n *CacheNode) Close() error { + n.inPort.Close() + n.outPort.Close() + n.tracer.Close() + n.lru.Clear() + return nil +} + +func (n *CacheNode) forward(proc *process.Process) { + inReader := n.inPort.Open(proc) + var outWriter *packet.Writer + + for inPck := range inReader.Read() { + n.tracer.Read(inReader, inPck) + + inPayload := inPck.Payload() + if outPayload, ok := n.lru.Load(inPayload); ok { + outPck := packet.New(outPayload) + n.tracer.Transform(inPck, outPck) + n.tracer.Reduce(outPck) + } else { + if outWriter == nil { + outWriter = n.outPort.Open(proc) + } + + n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { + if _, ok := backPck.Payload().(types.Error); !ok { + n.lru.Store(inPayload, backPck.Payload()) + } + n.tracer.Transform(inPck, backPck) + n.tracer.Reduce(backPck) + })) + + n.tracer.Write(outWriter, inPck) + } + } +} + +func (n *CacheNode) backward(proc *process.Process) { + outWriter := n.outPort.Open(proc) + + for backPck := range outWriter.Receive() { + n.tracer.Receive(outWriter, backPck) + } +} diff --git a/ext/pkg/control/cache_test.go b/ext/pkg/control/cache_test.go new file mode 100644 index 00000000..8415234a --- /dev/null +++ b/ext/pkg/control/cache_test.go @@ -0,0 +1,78 @@ +package control + +import ( + "context" + "testing" + "time" + + "github.com/go-faker/faker/v4" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/types" + "github.com/stretchr/testify/assert" +) + +func TestCacheNodeCodec_Compile(t *testing.T) { + codec := NewCacheNodeCodec() + + spec := &CacheNodeSpec{ + Capacity: 1, + TTL: time.Second, + } + + n, err := codec.Compile(spec) + assert.NoError(t, err) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestNewCacheNode(t *testing.T) { + n := NewCacheNode(0, 0) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestOneToOneNode_Port(t *testing.T) { + n := NewCacheNode(0, 0) + defer n.Close() + + assert.NotNil(t, n.In(node.PortIn)) + assert.NotNil(t, n.Out(node.PortOut)) +} + +func TestCacheNode_SendAndReceive(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + n := NewCacheNode(0, 0) + defer n.Close() + + in := port.NewOut() + in.Link(n.In(node.PortIn)) + + proc := process.New() + defer proc.Exit(nil) + + inWriter := in.Open(proc) + + inPayload := types.NewString(faker.UUIDHyphenated()) + inPck := packet.New(inPayload) + + inWriter.Write(inPck) + + select { + case <-inWriter.Receive(): + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } + + inWriter.Write(inPck) + + select { + case <-inWriter.Receive(): + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } +} diff --git a/ext/pkg/control/retry.go b/ext/pkg/control/retry.go index 68e6bf76..e104697e 100644 --- a/ext/pkg/control/retry.go +++ b/ext/pkg/control/retry.go @@ -24,7 +24,6 @@ type RetryNode struct { tracer *packet.Tracer inPort *port.InPort outPort *port.OutPort - errPort *port.OutPort } var _ node.Node = (*RetryNode)(nil) @@ -45,12 +44,10 @@ func NewRetryNode(threshold int) *RetryNode { tracer: packet.NewTracer(), inPort: port.NewIn(), outPort: port.NewOut(), - errPort: port.NewOut(), } n.inPort.AddListener(port.ListenFunc(n.forward)) n.outPort.AddListener(port.ListenFunc(n.backward)) - n.errPort.AddListener(port.ListenFunc(n.catch)) return n } @@ -68,8 +65,6 @@ func (n *RetryNode) Out(name string) *port.OutPort { switch name { case node.PortOut: return n.outPort - case node.PortError: - return n.errPort default: return nil } @@ -79,7 +74,6 @@ func (n *RetryNode) Out(name string) *port.OutPort { func (n *RetryNode) Close() error { n.inPort.Close() n.outPort.Close() - n.errPort.Close() n.tracer.Close() return nil } @@ -87,7 +81,6 @@ func (n *RetryNode) Close() error { func (n *RetryNode) forward(proc *process.Process) { inReader := n.inPort.Open(proc) outWriter := n.outPort.Open(proc) - errWriter := n.errPort.Open(proc) attempts := &sync.Map{} @@ -96,19 +89,15 @@ func (n *RetryNode) forward(proc *process.Process) { var hook packet.Hook hook = packet.HookFunc(func(backPck *packet.Packet) { - if _, ok := backPck.Payload().(types.Error); !ok { - n.tracer.Transform(inPck, backPck) - n.tracer.Reduce(backPck) - return - } - for { actual, _ := attempts.LoadOrStore(inPck, 0) count := actual.(int) - if count == n.threshold { + _, fail := backPck.Payload().(types.Error) + if !fail || count == n.threshold { + attempts.Delete(inPck) n.tracer.Transform(inPck, backPck) - n.tracer.Write(errWriter, backPck) + n.tracer.Reduce(backPck) return } @@ -133,11 +122,3 @@ func (n *RetryNode) backward(proc *process.Process) { n.tracer.Receive(outWriter, backPck) } } - -func (n *RetryNode) catch(proc *process.Process) { - errWriter := n.errPort.Open(proc) - - for backPck := range errWriter.Receive() { - n.tracer.Receive(errWriter, backPck) - } -} diff --git a/ext/pkg/control/retry_test.go b/ext/pkg/control/retry_test.go index eceab341..ca0419f9 100644 --- a/ext/pkg/control/retry_test.go +++ b/ext/pkg/control/retry_test.go @@ -40,7 +40,6 @@ func TestRetryNode_Port(t *testing.T) { assert.NotNil(t, n.In(node.PortIn)) assert.NotNil(t, n.Out(node.PortOut)) - assert.NotNil(t, n.Out(node.PortError)) } func TestRetryNode_SendAndReceive(t *testing.T) { diff --git a/pkg/types/lru.go b/pkg/types/lru.go new file mode 100644 index 00000000..60f46a92 --- /dev/null +++ b/pkg/types/lru.go @@ -0,0 +1,154 @@ +package types + +import ( + "container/list" + "sync" + "time" +) + +// LRU represents an LRU (Least Recently Used) cache. +type LRU struct { + capacity int + entries map[uint64][]*list.Element + order *list.List + ttl time.Duration + mu sync.Mutex +} + +// NewLRU creates a new LRU with the given capacity and TTL. +func NewLRU(capacity int, ttl time.Duration) *LRU { + return &LRU{ + capacity: capacity, + entries: make(map[uint64][]*list.Element), + order: list.New(), + ttl: ttl, + } +} + +// Load retrieves the value associated with the key. +func (c *LRU) Load(key Value) (Value, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + hash := HashOf(key) + for _, elem := range c.entries[hash] { + pair := elem.Value.([3]any) + if Equal(pair[0].(Value), key) { + if c.ttl > 0 && time.Now().After(pair[2].(time.Time)) { + c.remove(elem) + return nil, false + } + c.order.MoveToFront(elem) + return pair[1].(Value), true + } + } + return nil, false +} + +// Store adds or updates a key-value pair in the cache. +func (c *LRU) Store(key, value Value) { + c.mu.Lock() + defer c.mu.Unlock() + + hash := HashOf(key) + for _, elem := range c.entries[hash] { + pair := elem.Value.([3]any) + if Equal(pair[0].(Value), key) { + if c.ttl > 0 { + elem.Value = [3]any{key, value, time.Now().Add(c.ttl)} + } else { + elem.Value = [3]any{key, value, nil} + } + c.order.MoveToFront(elem) + return + } + } + + if c.capacity > 0 && c.order.Len() >= c.capacity { + c.expire() + c.evict() + } + + var elem *list.Element + if c.ttl > 0 { + elem = c.order.PushFront([3]any{key, value, time.Now().Add(c.ttl)}) + } else { + elem = c.order.PushFront([3]any{key, value, nil}) + } + c.entries[hash] = append(c.entries[hash], elem) +} + +// Delete removes a key-value pair from the cache. +func (c *LRU) Delete(key Value) { + c.mu.Lock() + defer c.mu.Unlock() + + hash := HashOf(key) + for _, elem := range c.entries[hash] { + pair := elem.Value.([3]any) + if Equal(pair[0].(Value), key) { + c.remove(elem) + break + } + } +} + +// Len returns the current number of items in the cache. +func (c *LRU) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + + return c.order.Len() +} + +// Clear removes all items from the cache. +func (c *LRU) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.entries = make(map[uint64][]*list.Element) + c.order.Init() +} + +func (c *LRU) expire() { + if c.ttl <= 0 { + return + } + + now := time.Now() + for elem := c.order.Back(); elem != nil; elem = c.order.Back() { + pair := elem.Value.([3]any) + if now.After(pair[2].(time.Time)) { + c.remove(elem) + } else { + break + } + } +} + +func (c *LRU) evict() { + for c.capacity > 0 && c.order.Len() >= c.capacity { + elem := c.order.Back() + if elem == nil { + return + } + c.remove(elem) + } +} + +func (c *LRU) remove(elem *list.Element) { + pair := elem.Value.([3]any) + hash := HashOf(pair[0].(Value)) + + c.order.Remove(elem) + + for i, e := range c.entries[hash] { + if e == elem { + c.entries[hash] = append(c.entries[hash][:i], c.entries[hash][i+1:]...) + if len(c.entries[hash]) == 0 { + delete(c.entries, hash) + } + break + } + } +} diff --git a/pkg/types/lru_test.go b/pkg/types/lru_test.go new file mode 100644 index 00000000..853eb0ff --- /dev/null +++ b/pkg/types/lru_test.go @@ -0,0 +1,117 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLRU_Load(t *testing.T) { + cache := NewLRU(2, 0) + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + key2, value2 := NewBinary([]byte{2}), NewBinary([]byte{20}) + + cache.Store(key1, value1) + cache.Store(key2, value2) + + v, ok := cache.Load(key1) + assert.True(t, ok) + assert.Equal(t, value1, v) + + v, ok = cache.Load(key2) + assert.True(t, ok) + assert.Equal(t, value2, v) +} + +func TestLRU_Store(t *testing.T) { + cache := NewLRU(2, 0) + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + key2, value2 := NewBinary([]byte{2}), NewBinary([]byte{20}) + + cache.Store(key1, value1) + cache.Store(key2, value2) + + v, ok := cache.Load(key1) + assert.True(t, ok) + assert.Equal(t, value1, v) + + v, ok = cache.Load(key2) + assert.True(t, ok) + assert.Equal(t, value2, v) + + cache.Store(key1, value2) + + v, ok = cache.Load(key1) + assert.True(t, ok) + assert.Equal(t, value2, v) +} + +func TestLRU_Delete(t *testing.T) { + cache := NewLRU(2, 0) + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + key2, value2 := NewBinary([]byte{2}), NewBinary([]byte{20}) + + cache.Store(key1, value1) + cache.Store(key2, value2) + + cache.Delete(key1) + v, ok := cache.Load(key1) + assert.False(t, ok) + assert.Nil(t, v) + + v, ok = cache.Load(key2) + assert.True(t, ok) + assert.Equal(t, value2, v) +} + +func TestLRU_Evict(t *testing.T) { + cache := NewLRU(2, 0) + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + key2, value2 := NewBinary([]byte{2}), NewBinary([]byte{20}) + key3, value3 := NewBinary([]byte{3}), NewBinary([]byte{30}) + + cache.Store(key1, value1) + cache.Store(key2, value2) + cache.Store(key3, value3) + + v, ok := cache.Load(key1) + assert.False(t, ok) + assert.Nil(t, v) + + v, ok = cache.Load(key2) + assert.True(t, ok) + assert.Equal(t, value2, v) + + v, ok = cache.Load(key3) + assert.True(t, ok) + assert.Equal(t, value3, v) +} + +func TestLRU_Len(t *testing.T) { + cache := NewLRU(2, 0) + assert.Equal(t, 0, cache.Len()) + + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + cache.Store(key1, value1) + assert.Equal(t, 1, cache.Len()) + + key2, value2 := NewBinary([]byte{2}), NewBinary([]byte{20}) + cache.Store(key2, value2) + assert.Equal(t, 2, cache.Len()) + + key3, value3 := NewBinary([]byte{3}), NewBinary([]byte{30}) + cache.Store(key3, value3) + assert.Equal(t, 2, cache.Len()) +} + +func TestLRU_Clear(t *testing.T) { + cache := NewLRU(2, 0) + assert.Equal(t, 0, cache.Len()) + + key1, value1 := NewBinary([]byte{1}), NewBinary([]byte{10}) + cache.Store(key1, value1) + assert.Equal(t, 1, cache.Len()) + + cache.Clear() + assert.Equal(t, 0, cache.Len()) +}