-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0c8b6c3
commit 96a12c8
Showing
15 changed files
with
506 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Cache Node | ||
|
||
**The Cache Node** implements a caching mechanism using an LRU (Least Recently Used) strategy to store and retrieve | ||
data. This node provides caching capabilities to store results temporarily and reuses them for future requests, | ||
improving performance by reducing the need for repeated processing. | ||
|
||
## Specification | ||
|
||
- **capacity**: Defines the maximum number of items the cache can hold. When the cache exceeds this capacity, the least | ||
recently used entries are evicted. | ||
- **ttl**: Specifies the time-to-live (TTL) for cache entries. Once an entry expires, it will be removed from the cache. | ||
If not set, the cache does not have a TTL. | ||
|
||
## Ports | ||
|
||
- **in**: Receives the input packet and performs a cache lookup. If the data is found, it is returned. If not, the data | ||
is processed and added to the cache. | ||
- **out**: Outputs the result of the cache lookup or processed data. | ||
|
||
## Example | ||
|
||
```yaml | ||
- kind: cache | ||
capacity: 100 # Cache capacity of 100 items | ||
ttl: 1h # Entries will expire after 1 hour | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Cache Node | ||
|
||
**Cache 노드**는 LRU(Least Recently Used) 전략을 사용하여 데이터를 캐싱하고 저장 및 조회 기능을 제공합니다. 이 노드는 처리 결과를 일시적으로 저장하고, 이후 요청 시 이미 처리된 | ||
데이터를 재사용하여 성능을 향상시키는 역할을 합니다. | ||
|
||
## 명세 | ||
|
||
- **capacity**: 캐시가 보유할 수 있는 최대 항목 수를 정의합니다. 캐시 용량을 초과하면 가장 최근에 사용되지 않은 항목이 삭제됩니다. | ||
- **ttl**: 캐시 항목의 유효 기간(Time-to-Live)을 지정합니다. 설정된 시간이 지나면 항목은 캐시에서 자동으로 삭제됩니다. 설정하지 않으면 TTL이 적용되지 않습니다. | ||
|
||
## 포트 | ||
|
||
- **in**: 입력 패킷을 받아 캐시 조회를 수행합니다. 데이터가 캐시에서 발견되면 반환하고, 그렇지 않으면 데이터를 처리하여 캐시에 추가합니다. | ||
- **out**: 캐시 조회 결과 또는 처리된 데이터를 출력합니다. | ||
|
||
## 예시 | ||
|
||
```yaml | ||
- kind: cache | ||
capacity: 100 # 캐시 용량 100 항목 | ||
ttl: 1h # 항목은 1시간 후 만료 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package control | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/siyul-park/uniflow/pkg/node" | ||
"github.com/siyul-park/uniflow/pkg/packet" | ||
"github.com/siyul-park/uniflow/pkg/port" | ||
"github.com/siyul-park/uniflow/pkg/process" | ||
"github.com/siyul-park/uniflow/pkg/scheme" | ||
"github.com/siyul-park/uniflow/pkg/spec" | ||
"github.com/siyul-park/uniflow/pkg/types" | ||
) | ||
|
||
// CacheNodeSpec represents the specification for a cache node. | ||
type CacheNodeSpec struct { | ||
spec.Meta `map:",inline"` | ||
Capacity int `map:"capacity,omitempty"` | ||
TTL time.Duration `map:"ttl,omitempty"` | ||
} | ||
|
||
// CacheNode represents a node in the cache. | ||
type CacheNode struct { | ||
lru *types.LRU | ||
tracer *packet.Tracer | ||
inPort *port.InPort | ||
outPort *port.OutPort | ||
} | ||
|
||
const KindCache = "cache" | ||
|
||
var _ node.Node = (*CacheNode)(nil) | ||
|
||
// NewCacheNodeCodec creates a new codec for CacheNodeSpec. | ||
func NewCacheNodeCodec() scheme.Codec { | ||
return scheme.CodecWithType(func(spec *CacheNodeSpec) (node.Node, error) { | ||
return NewCacheNode(spec.Capacity, spec.TTL), nil | ||
}) | ||
} | ||
|
||
// NewCacheNode creates a new CacheNode with the given capacity and TTL. | ||
func NewCacheNode(capacity int, ttl time.Duration) *CacheNode { | ||
n := &CacheNode{ | ||
lru: types.NewLRU(capacity, ttl), | ||
tracer: packet.NewTracer(), | ||
inPort: port.NewIn(), | ||
outPort: port.NewOut(), | ||
} | ||
|
||
n.inPort.AddListener(port.ListenFunc(n.forward)) | ||
n.outPort.AddListener(port.ListenFunc(n.backward)) | ||
|
||
return n | ||
} | ||
|
||
// In returns the input port for the given name. | ||
func (n *CacheNode) In(name string) *port.InPort { | ||
switch name { | ||
case node.PortIn: | ||
return n.inPort | ||
default: | ||
return nil | ||
} | ||
} | ||
|
||
// Out returns the output port for the given name. | ||
func (n *CacheNode) Out(name string) *port.OutPort { | ||
switch name { | ||
case node.PortOut: | ||
return n.outPort | ||
default: | ||
return nil | ||
} | ||
} | ||
|
||
// Close closes the CacheNode and its ports. | ||
func (n *CacheNode) Close() error { | ||
n.inPort.Close() | ||
n.outPort.Close() | ||
n.tracer.Close() | ||
n.lru.Clear() | ||
return nil | ||
} | ||
|
||
func (n *CacheNode) forward(proc *process.Process) { | ||
inReader := n.inPort.Open(proc) | ||
var outWriter *packet.Writer | ||
|
||
for inPck := range inReader.Read() { | ||
n.tracer.Read(inReader, inPck) | ||
|
||
inPayload := inPck.Payload() | ||
if outPayload, ok := n.lru.Load(inPayload); ok { | ||
outPck := packet.New(outPayload) | ||
n.tracer.Transform(inPck, outPck) | ||
n.tracer.Reduce(outPck) | ||
} else { | ||
if outWriter == nil { | ||
outWriter = n.outPort.Open(proc) | ||
} | ||
|
||
n.tracer.AddHook(inPck, packet.HookFunc(func(backPck *packet.Packet) { | ||
if _, ok := backPck.Payload().(types.Error); !ok { | ||
n.lru.Store(inPayload, backPck.Payload()) | ||
} | ||
n.tracer.Transform(inPck, backPck) | ||
n.tracer.Reduce(backPck) | ||
})) | ||
|
||
n.tracer.Write(outWriter, inPck) | ||
} | ||
} | ||
} | ||
|
||
func (n *CacheNode) backward(proc *process.Process) { | ||
outWriter := n.outPort.Open(proc) | ||
|
||
for backPck := range outWriter.Receive() { | ||
n.tracer.Receive(outWriter, backPck) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package control | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-faker/faker/v4" | ||
"github.com/siyul-park/uniflow/pkg/node" | ||
"github.com/siyul-park/uniflow/pkg/packet" | ||
"github.com/siyul-park/uniflow/pkg/port" | ||
"github.com/siyul-park/uniflow/pkg/process" | ||
"github.com/siyul-park/uniflow/pkg/types" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestCacheNodeCodec_Compile(t *testing.T) { | ||
codec := NewCacheNodeCodec() | ||
|
||
spec := &CacheNodeSpec{ | ||
Capacity: 1, | ||
TTL: time.Second, | ||
} | ||
|
||
n, err := codec.Compile(spec) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, n) | ||
assert.NoError(t, n.Close()) | ||
} | ||
|
||
func TestNewCacheNode(t *testing.T) { | ||
n := NewCacheNode(0, 0) | ||
assert.NotNil(t, n) | ||
assert.NoError(t, n.Close()) | ||
} | ||
|
||
func TestOneToOneNode_Port(t *testing.T) { | ||
n := NewCacheNode(0, 0) | ||
defer n.Close() | ||
|
||
assert.NotNil(t, n.In(node.PortIn)) | ||
assert.NotNil(t, n.Out(node.PortOut)) | ||
} | ||
|
||
func TestCacheNode_SendAndReceive(t *testing.T) { | ||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second) | ||
defer cancel() | ||
|
||
n := NewCacheNode(0, 0) | ||
defer n.Close() | ||
|
||
in := port.NewOut() | ||
in.Link(n.In(node.PortIn)) | ||
|
||
proc := process.New() | ||
defer proc.Exit(nil) | ||
|
||
inWriter := in.Open(proc) | ||
|
||
inPayload := types.NewString(faker.UUIDHyphenated()) | ||
inPck := packet.New(inPayload) | ||
|
||
inWriter.Write(inPck) | ||
|
||
select { | ||
case <-inWriter.Receive(): | ||
case <-ctx.Done(): | ||
assert.Fail(t, ctx.Err().Error()) | ||
} | ||
|
||
inWriter.Write(inPck) | ||
|
||
select { | ||
case <-inWriter.Receive(): | ||
case <-ctx.Done(): | ||
assert.Fail(t, ctx.Err().Error()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.