diff --git a/cmd/pkg/uniflow/main.go b/cmd/pkg/uniflow/main.go index a1fbcefd..dd5f767b 100644 --- a/cmd/pkg/uniflow/main.go +++ b/cmd/pkg/uniflow/main.go @@ -28,23 +28,24 @@ import ( const configFile = ".uniflow.toml" const ( + topicSpecs = "specs" + topicSecrets = "secrets" + topicCharts = "charts" + opCreateSpecs = "specs.create" opReadSpecs = "specs.read" opUpdateSpecs = "specs.update" opDeleteSpecs = "specs.delete" - opWatchSpecs = "specs.watch" opCreateSecrets = "secrets.create" opReadSecrets = "secrets.read" opUpdateSecrets = "secrets.update" opDeleteSecrets = "secrets.delete" - opWatchSecrets = "secrets.watch" opCreateCharts = "charts.create" opReadCharts = "charts.read" opUpdateCharts = "charts.update" opDeleteCharts = "charts.delete" - opWatchCharts = "charts.watch" ) func init() { @@ -100,28 +101,43 @@ func main() { languages.Store(javascript.Language, javascript.NewCompiler()) languages.Store(typescript.Language, typescript.NewCompiler()) - operators := map[string]any{ + signals := map[string]any{ + topicSpecs: system.WatchResource(specStore), + topicSecrets: system.WatchResource(secretStore), + topicCharts: system.WatchResource(chartStore), + } + syscalls := map[string]any{ opCreateSpecs: system.CreateResource(specStore), opReadSpecs: system.ReadResource(specStore), opUpdateSpecs: system.UpdateResource(specStore), opDeleteSpecs: system.DeleteResource(specStore), - opWatchSpecs: system.WatchResource(specStore), opCreateSecrets: system.CreateResource(secretStore), opReadSecrets: system.ReadResource(secretStore), opUpdateSecrets: system.UpdateResource(secretStore), opDeleteSecrets: system.DeleteResource(secretStore), - opWatchSecrets: system.WatchResource(secretStore), opCreateCharts: system.CreateResource(chartStore), opReadCharts: system.ReadResource(chartStore), opUpdateCharts: system.UpdateResource(chartStore), opDeleteCharts: system.DeleteResource(chartStore), - opWatchCharts: system.WatchResource(chartStore), + } + + systemAddToScheme := system.AddToScheme() + + for topic, signal := range signals { + if err := systemAddToScheme.SetSignal(topic, signal); err != nil { + log.Fatal(err) + } + } + for opcode, syscall := range syscalls { + if err := systemAddToScheme.SetSyscall(opcode, syscall); err != nil { + log.Fatal(err) + } } schemeBuilder.Register(control.AddToScheme(languages, cel.Language)) schemeBuilder.Register(io.AddToScheme(io.NewOSFileSystem())) schemeBuilder.Register(network.AddToScheme()) - schemeBuilder.Register(system.AddToScheme(operators)) + schemeBuilder.Register(systemAddToScheme) hookBuilder.Register(network.AddToHook()) hookBuilder.Register(system.AddToHook()) diff --git a/docs/getting_started.md b/docs/getting_started.md index a507b92a..c040b8c0 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -185,10 +185,11 @@ To retrieve charts: ## Integrating HTTP API -To modify node specifications via the HTTP API, set up workflows accordingly. You can use the `native` node provided in the [core extensions](../ext/README.md): +To modify node specifications via the HTTP API, set up workflows accordingly. You can use the `syscall` node provided in +the [core extensions](../ext/README.md): ```yaml -kind: native +kind: syscall opcode: specs.create # or specs.read, specs.update, specs.delete ``` diff --git a/docs/getting_started_kr.md b/docs/getting_started_kr.md index 4c301101..fb523629 100644 --- a/docs/getting_started_kr.md +++ b/docs/getting_started_kr.md @@ -185,10 +185,10 @@ pong# ## HTTP API 통합 -HTTP API를 통해 노드 명세를 수정하려면, 관련 워크플로우를 설정해야 합니다. 이를 위해 [기본 확장](../ext/README_kr.md)에 포함된 `native` 노드를 사용할 수 있습니다: +HTTP API를 통해 노드 명세를 수정하려면, 관련 워크플로우를 설정해야 합니다. 이를 위해 [기본 확장](../ext/README_kr.md)에 포함된 `syscall` 노드를 사용할 수 있습니다: ```yaml -kind: native +kind: syscall opcode: specs.create # 또는 specs.read, specs.update, specs.delete ``` diff --git a/examples/system.yaml b/examples/system.yaml index fe0c1ffc..9dcadb03 100644 --- a/examples/system.yaml +++ b/examples/system.yaml @@ -169,7 +169,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: specs.create - kind: snippet language: javascript @@ -192,7 +192,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: specs.read - kind: snippet language: javascript @@ -210,7 +210,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: specs.update - kind: snippet language: javascript @@ -233,7 +233,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: specs.delete - kind: snippet language: javascript @@ -250,7 +250,7 @@ - kind: snippet language: cel code: 'has(self.params) ? self.params : null' - - kind: native + - kind: syscall opcode: specs.read - kind: snippet language: javascript @@ -277,7 +277,7 @@ export default function ({ body, params }) { return { ...body, ...params }; } - - kind: native + - kind: syscall opcode: specs.update - kind: snippet language: javascript @@ -301,7 +301,7 @@ - kind: snippet language: cel code: 'has(self.params) ? self.params : null' - - kind: native + - kind: syscall opcode: specs.delete - kind: snippet language: javascript @@ -327,7 +327,7 @@ - kind: signal name: signal - opcode: specs.watch + topic: specs ports: out: - name: session @@ -372,7 +372,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: secrets.create - kind: snippet language: javascript @@ -395,7 +395,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: secrets.read - kind: snippet language: javascript @@ -413,7 +413,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: secrets.update - kind: snippet language: javascript @@ -436,7 +436,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: secrets.delete - kind: snippet language: javascript @@ -453,7 +453,7 @@ - kind: snippet language: cel code: 'has(self.params) ? self.params : null' - - kind: native + - kind: syscall opcode: secrets.read - kind: snippet language: javascript @@ -480,7 +480,7 @@ export default function ({ body, params }) { return { ...body, ...params }; } - - kind: native + - kind: syscall opcode: secrets.update - kind: snippet language: javascript @@ -507,7 +507,7 @@ export default function ({ body, params }) { return { ...body, ...params }; } - - kind: native + - kind: syscall opcode: secrets.delete - kind: snippet language: javascript @@ -527,7 +527,7 @@ - kind: signal name: signal - opcode: secrets.watch + topic: secrets ports: out: - name: session @@ -572,7 +572,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: charts.create - kind: snippet language: javascript @@ -595,7 +595,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: charts.read - kind: snippet language: javascript @@ -613,7 +613,7 @@ - kind: snippet language: cel code: 'has(self.body) ? self.body : null' - - kind: native + - kind: syscall opcode: charts.update - kind: snippet language: javascript @@ -636,7 +636,7 @@ values?.map((value) => ({ [key]: value })) || [] ); } - - kind: native + - kind: syscall opcode: charts.delete - kind: snippet language: javascript @@ -653,7 +653,7 @@ - kind: snippet language: cel code: 'has(self.params) ? self.params : null' - - kind: native + - kind: syscall opcode: charts.read - kind: snippet language: javascript @@ -680,7 +680,7 @@ export default function ({ body, params }) { return { ...body, ...params }; } - - kind: native + - kind: syscall opcode: charts.update - kind: snippet language: javascript @@ -707,7 +707,7 @@ export default function ({ body, params }) { return { ...body, ...params }; } - - kind: native + - kind: syscall opcode: charts.delete - kind: snippet language: javascript @@ -727,7 +727,7 @@ - kind: signal name: signal - opcode: charts.watch + topic: charts ports: out: - name: session diff --git a/ext/README.md b/ext/README.md index 280d9579..533ebe9a 100644 --- a/ext/README.md +++ b/ext/README.md @@ -48,4 +48,5 @@ Facilitates smooth execution of network-related tasks across various protocols. Manages and optimizes system components. -- **[Native Node](./docs/native_node.md)**: Performs function calls within the system and returns results as packets. \ No newline at end of file +- **[Signal Node](./docs/signal_node.md)**: Detects and responds to events occurring within the system. +- **[Syscall Node](./docs/syscall_node.md)**: Executes system-level function calls and delivers the results. diff --git a/ext/README_kr.md b/ext/README_kr.md index e7750598..2b2b11c1 100644 --- a/ext/README_kr.md +++ b/ext/README_kr.md @@ -47,4 +47,5 @@ 시스템 구성 요소를 관리하고 최적화합니다. -- **[Native 노드](./docs/native_node_kr.md)**: 시스템 내부에서 함수 호출을 수행하고 결과를 패킷으로 반환합니다. +- **[Signal 노드](./docs/signal_node_kr.md)**: 시스템 내부에서 발생하는 이벤트를 감지하고 대응합니다. +- **[Syscall 노드](./docs/syscall_node_kr.md)**: 시스템 내부 함수 호출을 수행하여 결과를 제공합니다. \ No newline at end of file diff --git a/ext/docs/signal_node.md b/ext/docs/signal_node.md index 28c86833..d1a5e4e4 100644 --- a/ext/docs/signal_node.md +++ b/ext/docs/signal_node.md @@ -16,7 +16,7 @@ processing real-time events or system-level signals within the workflow. ```yaml - kind: signal - opcode: specs.watch + opcode: specs ports: out: - name: next diff --git a/ext/docs/signal_node_kr.md b/ext/docs/signal_node_kr.md index 7881c037..4b54656d 100644 --- a/ext/docs/signal_node_kr.md +++ b/ext/docs/signal_node_kr.md @@ -15,7 +15,7 @@ ```yaml - kind: signal - opcode: specs.watch + opcode: specs ports: out: - name: next diff --git a/ext/docs/native_node.md b/ext/docs/syscall_node.md similarity index 76% rename from ext/docs/native_node.md rename to ext/docs/syscall_node.md index da4a427c..fc525da6 100644 --- a/ext/docs/native_node.md +++ b/ext/docs/syscall_node.md @@ -1,6 +1,7 @@ -# Native Node +# Syscall Node -**The Native Node** performs function call operations within the system. This node processes system calls based on the `opcode`, passing input packets to the function for execution and returning the result. +**The Syscall Node** performs function call operations within the system. This node processes system calls based on the +`opcode`, passing input packets to the function for execution and returning the result. ## Specification @@ -23,7 +24,7 @@ - name: specs_create port: in -- kind: native +- kind: syscall name: specs_create opcode: specs.create ``` diff --git a/ext/docs/native_node_kr.md b/ext/docs/syscall_node_kr.md similarity index 73% rename from ext/docs/native_node_kr.md rename to ext/docs/syscall_node_kr.md index c17744dd..cae1f2fe 100644 --- a/ext/docs/native_node_kr.md +++ b/ext/docs/syscall_node_kr.md @@ -1,6 +1,6 @@ -# Native 노드 +# Syscall 노드 -**Native 노드**는 시스템 내부에서 함수 호출 작업을 수행하는 노드입니다. 이 노드는 `opcode`를 기반으로 시스템 호출을 처리하며, 입력 패킷을 함수에 전달하여 실행하고, 그 결과를 반환합니다. +**Syscall 노드**는 시스템 내부에서 함수 호출 작업을 수행하는 노드입니다. 이 노드는 `opcode`를 기반으로 시스템 호출을 처리하며, 입력 패킷을 함수에 전달하여 실행하고, 그 결과를 반환합니다. ## 명세 @@ -23,7 +23,7 @@ - name: specs_create port: in -- kind: native +- kind: syscall name: specs_create opcode: specs.create ``` diff --git a/ext/pkg/language/cel/adapter.go b/ext/pkg/language/cel/adapter.go index 331fde44..837cc5a8 100644 --- a/ext/pkg/language/cel/adapter.go +++ b/ext/pkg/language/cel/adapter.go @@ -9,10 +9,10 @@ type adapter struct{} var _ types.Adapter = (*adapter)(nil) -func (*adapter) NativeToValue(value any) ref.Val { +func (*adapter) SyscallToValue(value any) ref.Val { switch v := value.(type) { case error: return &Error{error: v} } - return types.DefaultTypeAdapter.NativeToValue(value) + return types.DefaultTypeAdapter.SyscallToValue(value) } diff --git a/ext/pkg/language/cel/error.go b/ext/pkg/language/cel/error.go index dbfb3db0..6840a804 100644 --- a/ext/pkg/language/cel/error.go +++ b/ext/pkg/language/cel/error.go @@ -18,8 +18,8 @@ var ErrorType = cel.ObjectType("error") var _ types.Error = (*Error)(nil) -// ConvertToNative converts the Error instance to a native Go type as per the provided type descriptor. -func (e *Error) ConvertToNative(typeDesc reflect.Type) (any, error) { +// ConvertToSyscall converts the Error instance to a syscall Go type as per the provided type descriptor. +func (e *Error) ConvertToSyscall(typeDesc reflect.Type) (any, error) { return nil, e.error } diff --git a/ext/pkg/system/builder.go b/ext/pkg/system/builder.go index dc874a06..9016e452 100644 --- a/ext/pkg/system/builder.go +++ b/ext/pkg/system/builder.go @@ -2,6 +2,8 @@ package system import ( "context" + "github.com/pkg/errors" + "github.com/siyul-park/uniflow/pkg/encoding" "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/node" "github.com/siyul-park/uniflow/pkg/scheme" @@ -12,7 +14,8 @@ import ( ) type SchemeRegister struct { - operators map[string]any + syscalls map[string]func(context.Context, []any) ([]any, error) + signals map[string]func(context.Context) (<-chan any, error) } var _ scheme.Register = (*SchemeRegister)(nil) @@ -37,29 +40,21 @@ func AddToHook() hook.Register { } // AddToScheme returns a function that adds node types and codecs to the provided spec. -func AddToScheme(operators map[string]any) *SchemeRegister { - return &SchemeRegister{operators: operators} +func AddToScheme() *SchemeRegister { + return &SchemeRegister{ + syscalls: make(map[string]func(context.Context, []any) ([]any, error)), + signals: make(map[string]func(context.Context) (<-chan any, error)), + } } func (r *SchemeRegister) AddToScheme(s *scheme.Scheme) error { - functions := make(map[string]func(context.Context, []any) ([]any, error)) - signals := make(map[string]func(context.Context) (<-chan any, error)) - - for opcode := range r.operators { - if signal := r.Signal(opcode); signal != nil { - signals[opcode] = signal - } else if fn := r.Function(opcode); fn != nil { - functions[opcode] = fn - } - } - definitions := []struct { kind string codec scheme.Codec spec spec.Spec }{ - {KindNative, NewNativeNodeCodec(functions), &NativeNodeSpec{}}, - {KindSignal, NewSignalNodeCodec(signals), &SignalNodeSpec{}}, + {KindSyscall, NewSyscallNodeCodec(r.syscalls), &SyscallNodeSpec{}}, + {KindSignal, NewSignalNodeCodec(r.signals), &SignalNodeSpec{}}, } for _, def := range definitions { @@ -70,54 +65,53 @@ func (r *SchemeRegister) AddToScheme(s *scheme.Scheme) error { return nil } -func (r *SchemeRegister) Signal(opcode string) func(context.Context) (<-chan any, error) { - op, ok := r.operators[opcode] - if !ok { - return nil - } - - if signal, ok := op.(func(context.Context) (<-chan any, error)); ok { - return signal - } else if signal, ok := op.(func(context.Context) <-chan any); ok { - return func(ctx context.Context) (<-chan any, error) { - return signal(ctx), nil +func (r *SchemeRegister) SetSignal(topic string, fn any) error { + var signal func(context.Context) (<-chan any, error) + if s, ok := fn.(func(context.Context) (<-chan any, error)); ok { + signal = s + } else if s, ok := fn.(func(context.Context) <-chan any); ok { + signal = func(ctx context.Context) (<-chan any, error) { + return s(ctx), nil } - } else if signal, ok := op.(func() (<-chan any, error)); ok { - return func(_ context.Context) (<-chan any, error) { - return signal() + } else if s, ok := fn.(func() (<-chan any, error)); ok { + signal = func(_ context.Context) (<-chan any, error) { + return s() } - } else if signal, ok := op.(func() <-chan any); ok { - return func(_ context.Context) (<-chan any, error) { - return signal(), nil + } else if s, ok := fn.(func() <-chan any); ok { + signal = func(_ context.Context) (<-chan any, error) { + return s(), nil } - } else { - return nil } + if signal == nil { + return errors.WithStack(encoding.ErrUnsupportedType) + } + + r.signals[topic] = signal + return nil } -func (r *SchemeRegister) Function(opcode string) func(context.Context, []any) ([]any, error) { - op, ok := r.operators[opcode] - if !ok { - return nil - } +func (r *SchemeRegister) Signal(topic string) func(context.Context) (<-chan any, error) { + return r.signals[topic] +} - fn := reflect.ValueOf(op) - if fn.Kind() != reflect.Func { - return nil +func (r *SchemeRegister) SetSyscall(opcode string, fn any) error { + fnValue := reflect.ValueOf(fn) + if fnValue.Kind() != reflect.Func { + return errors.WithStack(encoding.ErrUnsupportedType) } typeContext := reflect.TypeOf((*context.Context)(nil)).Elem() typeError := reflect.TypeOf((*error)(nil)).Elem() - opType := fn.Type() - numIn := opType.NumIn() - numOut := opType.NumOut() + fnType := fnValue.Type() + numIn := fnType.NumIn() + numOut := fnType.NumOut() - return func(ctx context.Context, arguments []any) ([]any, error) { + r.syscalls[opcode] = func(ctx context.Context, arguments []any) ([]any, error) { ins := make([]reflect.Value, numIn) offset := 0 - if numIn > 0 && opType.In(0).Implements(typeContext) { + if numIn > 0 && fnType.In(0).Implements(typeContext) { ins[0] = reflect.ValueOf(ctx) offset++ } @@ -128,19 +122,19 @@ func (r *SchemeRegister) Function(opcode string) func(context.Context, []any) ([ if err != nil { return nil, err } - in := reflect.New(opType.In(i)).Interface() + in := reflect.New(fnType.In(i)).Interface() if err := types.Unmarshal(arg, in); err != nil { return nil, err } ins[i] = reflect.ValueOf(in).Elem() } else { - ins[i] = reflect.Zero(opType.In(i)) + ins[i] = reflect.Zero(fnType.In(i)) } } - outs := fn.Call(ins) + outs := fnValue.Call(ins) - if numOut > 0 && opType.Out(numOut-1).Implements(typeError) { + if numOut > 0 && fnType.Out(numOut-1).Implements(typeError) { if err, ok := outs[numOut-1].Interface().(error); ok && err != nil { return nil, err } @@ -153,4 +147,9 @@ func (r *SchemeRegister) Function(opcode string) func(context.Context, []any) ([ } return returns, nil } + return nil +} + +func (r *SchemeRegister) Syscall(opcode string) func(context.Context, []any) ([]any, error) { + return r.syscalls[opcode] } diff --git a/ext/pkg/system/builder_test.go b/ext/pkg/system/builder_test.go index 5e0f3f86..5a70c666 100644 --- a/ext/pkg/system/builder_test.go +++ b/ext/pkg/system/builder_test.go @@ -37,10 +37,10 @@ func TestAddToHook(t *testing.T) { func TestAddToScheme(t *testing.T) { s := scheme.New() - err := AddToScheme(nil).AddToScheme(s) + err := AddToScheme().AddToScheme(s) assert.NoError(t, err) - tests := []string{KindNative, KindSignal} + tests := []string{KindSyscall, KindSignal} for _, tt := range tests { t.Run(tt, func(t *testing.T) { @@ -55,15 +55,16 @@ func TestSchemeRegister_Signal(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() - opcode := faker.UUIDHyphenated() + topic := faker.UUIDHyphenated() + + register := AddToScheme() - register := AddToScheme(map[string]any{ - opcode: func() <-chan any { - return make(chan any) - }, + err := register.SetSignal(topic, func() <-chan any { + return make(chan any) }) + assert.NoError(t, err) - signal := register.Signal(opcode) + signal := register.Signal(topic) assert.NotNil(t, signal) sig, err := signal(ctx) @@ -75,15 +76,16 @@ func TestSchemeRegister_Signal(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() - opcode := faker.UUIDHyphenated() + topic := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func() (<-chan any, error) { - return make(chan any), nil - }, + register := AddToScheme() + + err := register.SetSignal(topic, func() (<-chan any, error) { + return make(chan any), nil }) + assert.NoError(t, err) - signal := register.Signal(opcode) + signal := register.Signal(topic) assert.NotNil(t, signal) sig, err := signal(ctx) @@ -95,15 +97,16 @@ func TestSchemeRegister_Signal(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() - opcode := faker.UUIDHyphenated() + topic := faker.UUIDHyphenated() + + register := AddToScheme() - register := AddToScheme(map[string]any{ - opcode: func(_ context.Context) <-chan any { - return make(chan any) - }, + err := register.SetSignal(topic, func(_ context.Context) <-chan any { + return make(chan any) }) + assert.NoError(t, err) - signal := register.Signal(opcode) + signal := register.Signal(topic) assert.NotNil(t, signal) sig, err := signal(ctx) @@ -115,15 +118,16 @@ func TestSchemeRegister_Signal(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() - opcode := faker.UUIDHyphenated() + topic := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(_ context.Context) (<-chan any, error) { - return make(chan any), nil - }, + register := AddToScheme() + + err := register.SetSignal(topic, func(_ context.Context) (<-chan any, error) { + return make(chan any), nil }) + assert.NoError(t, err) - signal := register.Signal(opcode) + signal := register.Signal(topic) assert.NotNil(t, signal) sig, err := signal(ctx) @@ -132,18 +136,19 @@ func TestSchemeRegister_Signal(t *testing.T) { }) } -func TestSchemeRegister_Function(t *testing.T) { +func TestSchemeRegister_Syscall(t *testing.T) { t.Run("func() void", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func() {}, - }) + register := AddToScheme() + + err := register.SetSyscall(opcode, func() {}) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) res, err := fn(ctx, nil) @@ -157,16 +162,17 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func() error { - return errors.New(faker.Word()) - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func() error { + return errors.New(faker.Word()) }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) - _, err := fn(ctx, nil) + _, err = fn(ctx, nil) assert.Error(t, err) }) @@ -176,13 +182,14 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(arg string) string { - return arg - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(arg string) string { + return arg }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() @@ -199,18 +206,19 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(arg string) (string, error) { - return "", errors.New(faker.Word()) - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(arg string) (string, error) { + return "", errors.New(faker.Word()) }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() - _, err := fn(ctx, []any{arg}) + _, err = fn(ctx, []any{arg}) assert.Error(t, err) }) @@ -220,13 +228,14 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(_ context.Context, arg string) string { - return arg - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(_ context.Context, arg string) string { + return arg }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() @@ -243,18 +252,19 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(_ context.Context, arg string) (string, error) { - return "", errors.New(faker.Word()) - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(_ context.Context, arg string) (string, error) { + return "", errors.New(faker.Word()) }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() - _, err := fn(ctx, []any{arg}) + _, err = fn(ctx, []any{arg}) assert.Error(t, err) }) @@ -264,13 +274,14 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(arg1, arg2 string) (string, string) { - return arg1, arg2 - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(arg1, arg2 string) (string, string) { + return arg1, arg2 }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() @@ -288,18 +299,19 @@ func TestSchemeRegister_Function(t *testing.T) { opcode := faker.UUIDHyphenated() - register := AddToScheme(map[string]any{ - opcode: func(arg1, arg2 string) (string, string, error) { - return "", "", errors.New(faker.Word()) - }, + register := AddToScheme() + + err := register.SetSyscall(opcode, func(arg1, arg2 string) (string, string, error) { + return "", "", errors.New(faker.Word()) }) + assert.NoError(t, err) - fn := register.Function(opcode) + fn := register.Syscall(opcode) assert.NotNil(t, fn) arg := faker.UUIDHyphenated() - _, err := fn(ctx, []any{arg, arg}) + _, err = fn(ctx, []any{arg, arg}) assert.Error(t, err) }) } diff --git a/ext/pkg/system/native.go b/ext/pkg/system/native.go deleted file mode 100644 index 37d61165..00000000 --- a/ext/pkg/system/native.go +++ /dev/null @@ -1,83 +0,0 @@ -package system - -import ( - "context" - "github.com/pkg/errors" - "github.com/siyul-park/uniflow/pkg/node" - "github.com/siyul-park/uniflow/pkg/packet" - "github.com/siyul-park/uniflow/pkg/process" - "github.com/siyul-park/uniflow/pkg/scheme" - "github.com/siyul-park/uniflow/pkg/spec" - "github.com/siyul-park/uniflow/pkg/types" -) - -// NativeNodeSpec specifies the creation parameters for a NativeNode. -type NativeNodeSpec struct { - spec.Meta `map:",inline"` - OPCode string `map:"opcode"` -} - -// NativeNode executes synchronized function. -type NativeNode struct { - *node.OneToOneNode - fn func(context.Context, []any) ([]any, error) -} - -const KindNative = "native" - -// NewNativeNodeCodec returns a codec for NativeNodeSpec. -func NewNativeNodeCodec(functions map[string]func(ctx context.Context, arguments []any) ([]any, error)) scheme.Codec { - if functions == nil { - functions = make(map[string]func(ctx context.Context, arguments []any) ([]any, error)) - } - - return scheme.CodecWithType[*NativeNodeSpec](func(spec *NativeNodeSpec) (node.Node, error) { - fn, ok := functions[spec.OPCode] - if !ok { - return nil, errors.WithStack(ErrInvalidOperation) - } - - return NewNativeNode(fn) - }) -} - -// NewNativeNode creates a new NativeNode from a function. -func NewNativeNode(fn func(context.Context, []any) ([]any, error)) (*NativeNode, error) { - n := &NativeNode{fn: fn} - n.OneToOneNode = node.NewOneToOneNode(n.action) - return n, nil -} - -func (n *NativeNode) action(proc *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { - ctx := proc.Context() - inPayload := inPck.Payload() - - var arguments []any - if v, ok := inPayload.(types.Slice); ok { - arguments = v.Slice() - } else { - arguments = append(arguments, types.InterfaceOf(inPayload)) - } - - returns, err := n.fn(ctx, arguments) - if err != nil { - return nil, packet.New(types.NewError(err)) - } - - outPayloads := make([]types.Value, len(returns)) - for i, out := range returns { - outPayload, err := types.Marshal(out) - if err != nil { - return nil, packet.New(types.NewError(err)) - } - outPayloads[i] = outPayload - } - - if len(outPayloads) == 0 { - return packet.New(nil), nil - } - if len(outPayloads) == 1 { - return packet.New(outPayloads[0]), nil - } - return packet.New(types.NewSlice(outPayloads...)), nil -} diff --git a/ext/pkg/system/native_test.go b/ext/pkg/system/native_test.go deleted file mode 100644 index f371cacf..00000000 --- a/ext/pkg/system/native_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package system - -import ( - "context" - "testing" - "time" - - "github.com/go-faker/faker/v4" - "github.com/siyul-park/uniflow/pkg/node" - "github.com/siyul-park/uniflow/pkg/packet" - "github.com/siyul-park/uniflow/pkg/port" - "github.com/siyul-park/uniflow/pkg/process" - "github.com/siyul-park/uniflow/pkg/types" - "github.com/stretchr/testify/assert" -) - -func TestNativeNodeCodec_Compile(t *testing.T) { - opcode := faker.UUIDHyphenated() - - codec := NewNativeNodeCodec(map[string]func(ctx context.Context, arguments []any) ([]any, error){ - opcode: func(ctx context.Context, arguments []any) ([]any, error) { - return nil, nil - }, - }) - - spec := &NativeNodeSpec{ - OPCode: opcode, - } - - n, err := codec.Compile(spec) - assert.NoError(t, err) - assert.NotNil(t, n) - assert.NoError(t, n.Close()) -} - -func TestNewNativeNode(t *testing.T) { - n, err := NewNativeNode(nil) - assert.NoError(t, err) - assert.NotNil(t, n) - assert.NoError(t, n.Close()) -} - -func TestNativeNode_SendAndReceive(t *testing.T) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() - - n, _ := NewNativeNode(func(ctx context.Context, arguments []any) ([]any, error) { - return arguments, nil - }) - defer n.Close() - - in := port.NewOut() - in.Link(n.In(node.PortIn)) - - proc := process.New() - defer proc.Exit(nil) - - inWriter := in.Open(proc) - - inPayload := types.NewString(faker.UUIDHyphenated()) - inPck := packet.New(inPayload) - - inWriter.Write(inPck) - - select { - case outPck := <-inWriter.Receive(): - assert.Equal(t, inPayload, outPck.Payload()) - case <-ctx.Done(): - assert.Fail(t, ctx.Err().Error()) - } -} - -func BenchmarkNativeNode_SendAndReceive(b *testing.B) { - n, _ := NewNativeNode(func(ctx context.Context, arguments []any) ([]any, error) { - return arguments, nil - }) - defer n.Close() - - in := port.NewOut() - in.Link(n.In(node.PortIn)) - - proc := process.New() - defer proc.Exit(nil) - - inWriter := in.Open(proc) - - inPayload := types.NewString(faker.UUIDHyphenated()) - inPck := packet.New(inPayload) - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - inWriter.Write(inPck) - <-inWriter.Receive() - } -} diff --git a/ext/pkg/system/resource.go b/ext/pkg/system/resource.go new file mode 100644 index 00000000..c49d865c --- /dev/null +++ b/ext/pkg/system/resource.go @@ -0,0 +1,113 @@ +package system + +import ( + "context" + "encoding/json" + + jsonpatch "github.com/evanphx/json-patch/v5" + "github.com/gofrs/uuid" + + "github.com/siyul-park/uniflow/pkg/resource" +) + +// WatchResource creates a function to monitor changes in the resource store. +func WatchResource[T resource.Resource](store resource.Store[T]) func(context.Context) (<-chan any, error) { + return func(ctx context.Context) (<-chan any, error) { + stream, err := store.Watch(ctx) + if err != nil { + return nil, err + } + + signal := make(chan any) + + go func() { + defer close(signal) + for event := range stream.Next() { + signal <- event + } + }() + + return signal, nil + } +} + +// CreateResource is a generic function to store and load resources. +func CreateResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { + return func(ctx context.Context, resources []T) ([]T, error) { + if _, err := store.Store(ctx, resources...); err != nil { + return nil, err + } + return store.Load(ctx, resources...) + } +} + +// ReadResource is a generic function to load resources. +func ReadResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { + return func(ctx context.Context, resources []T) ([]T, error) { + return store.Load(ctx, resources...) + } +} + +// UpdateResource is a generic function to swap and load resources. +func UpdateResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { + return func(ctx context.Context, resources []T) ([]T, error) { + exists, err := store.Load(ctx, resources...) + if err != nil { + return nil, err + } + + origins := map[uuid.UUID]T{} + for _, v := range exists { + origins[v.GetID()] = v + } + + for i := 0; i < len(resources); i++ { + patch := resources[i] + origin, ok := origins[patch.GetID()] + if !ok { + resources = append(resources[:i], resources[i+1:]...) + i-- + continue + } + + json1, err := json.Marshal(patch) + if err != nil { + return nil, err + } + json2, err := json.Marshal(origin) + if err != nil { + return nil, err + } + + merge, err := jsonpatch.MergePatch(json1, json2) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(merge, &resources[i]); err != nil { + return nil, err + } + } + + if _, err := store.Swap(ctx, resources...); err != nil { + return nil, err + } + return resources, nil + } +} + +// DeleteResource is a generic function to load and delete resources. +func DeleteResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { + return func(ctx context.Context, resources []T) ([]T, error) { + exists, err := store.Load(ctx, resources...) + if err != nil { + return nil, err + } + if len(exists) > 0 { + if _, err := store.Delete(ctx, exists...); err != nil { + return nil, err + } + } + return exists, nil + } +} diff --git a/ext/pkg/system/resource_test.go b/ext/pkg/system/resource_test.go new file mode 100644 index 00000000..40ab9c6b --- /dev/null +++ b/ext/pkg/system/resource_test.go @@ -0,0 +1,100 @@ +package system + +import ( + "context" + "testing" + "time" + + "github.com/go-faker/faker/v4" + "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/resource" + "github.com/stretchr/testify/assert" +) + +func TestWatchResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + st := resource.NewStore[resource.Resource]() + fn := WatchResource(st) + + _, err := fn(ctx) + assert.NoError(t, err) +} + +func TestCreateResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + st := resource.NewStore[resource.Resource]() + fn := CreateResource(st) + + meta := &resource.Meta{ + ID: uuid.Must(uuid.NewV7()), + Name: faker.Word(), + } + + res, err := fn(ctx, []resource.Resource{meta}) + assert.NoError(t, err) + assert.Len(t, res, 1) +} + +func TestReadResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + st := resource.NewStore[resource.Resource]() + fn := ReadResource(st) + + meta := &resource.Meta{ + ID: uuid.Must(uuid.NewV7()), + Name: faker.Word(), + } + + _, err := st.Store(ctx, meta) + assert.NoError(t, err) + + res, err := fn(ctx, []resource.Resource{meta}) + assert.NoError(t, err) + assert.Len(t, res, 1) +} + +func TestUpdateResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + st := resource.NewStore[resource.Resource]() + fn := UpdateResource(st) + + meta := &resource.Meta{ + ID: uuid.Must(uuid.NewV7()), + Name: faker.Word(), + } + + _, err := st.Store(ctx, meta) + assert.NoError(t, err) + + res, err := fn(ctx, []resource.Resource{meta}) + assert.NoError(t, err) + assert.Len(t, res, 1) +} + +func TestDeleteResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + st := resource.NewStore[resource.Resource]() + fn := DeleteResource(st) + + meta := &resource.Meta{ + ID: uuid.Must(uuid.NewV7()), + Name: faker.Word(), + } + + _, err := st.Store(ctx, meta) + assert.NoError(t, err) + + res, err := fn(ctx, []resource.Resource{meta}) + assert.NoError(t, err) + assert.Len(t, res, 1) +} diff --git a/ext/pkg/system/signal.go b/ext/pkg/system/signal.go index 95de418b..4bd35564 100644 --- a/ext/pkg/system/signal.go +++ b/ext/pkg/system/signal.go @@ -17,7 +17,7 @@ import ( // SignalNodeSpec defines the specifications for creating a SignalNode. type SignalNodeSpec struct { spec.Meta `map:",inline"` - OPCode string `map:"opcode"` + Topic string `map:"topic"` } // SignalNode listens to a signal channel and forwards signals as packets. @@ -39,7 +39,7 @@ func NewSignalNodeCodec(signals map[string]func(context.Context) (<-chan any, er } return scheme.CodecWithType[*SignalNodeSpec](func(spec *SignalNodeSpec) (node.Node, error) { - fn, ok := signals[spec.OPCode] + fn, ok := signals[spec.Topic] if !ok { return nil, errors.WithStack(ErrInvalidOperation) } diff --git a/ext/pkg/system/signal_test.go b/ext/pkg/system/signal_test.go index af949352..245d40bc 100644 --- a/ext/pkg/system/signal_test.go +++ b/ext/pkg/system/signal_test.go @@ -22,7 +22,7 @@ func TestSignalNodeCodec_Compile(t *testing.T) { }) spec := &SignalNodeSpec{ - OPCode: opcode, + Topic: opcode, } n, err := codec.Compile(spec) diff --git a/ext/pkg/system/syscall.go b/ext/pkg/system/syscall.go index c49d865c..ec61f299 100644 --- a/ext/pkg/system/syscall.go +++ b/ext/pkg/system/syscall.go @@ -2,112 +2,82 @@ package system import ( "context" - "encoding/json" - - jsonpatch "github.com/evanphx/json-patch/v5" - "github.com/gofrs/uuid" - - "github.com/siyul-park/uniflow/pkg/resource" + "github.com/pkg/errors" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" + "github.com/siyul-park/uniflow/pkg/types" ) -// WatchResource creates a function to monitor changes in the resource store. -func WatchResource[T resource.Resource](store resource.Store[T]) func(context.Context) (<-chan any, error) { - return func(ctx context.Context) (<-chan any, error) { - stream, err := store.Watch(ctx) - if err != nil { - return nil, err - } - - signal := make(chan any) - - go func() { - defer close(signal) - for event := range stream.Next() { - signal <- event - } - }() - - return signal, nil - } +// SyscallNodeSpec specifies the creation parameters for a SyscallNode. +type SyscallNodeSpec struct { + spec.Meta `map:",inline"` + OPCode string `map:"opcode"` } -// CreateResource is a generic function to store and load resources. -func CreateResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { - return func(ctx context.Context, resources []T) ([]T, error) { - if _, err := store.Store(ctx, resources...); err != nil { - return nil, err - } - return store.Load(ctx, resources...) - } +// SyscallNode executes synchronized function. +type SyscallNode struct { + *node.OneToOneNode + fn func(context.Context, []any) ([]any, error) } -// ReadResource is a generic function to load resources. -func ReadResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { - return func(ctx context.Context, resources []T) ([]T, error) { - return store.Load(ctx, resources...) - } -} +const KindSyscall = "syscall" -// UpdateResource is a generic function to swap and load resources. -func UpdateResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { - return func(ctx context.Context, resources []T) ([]T, error) { - exists, err := store.Load(ctx, resources...) - if err != nil { - return nil, err - } +// NewSyscallNodeCodec returns a codec for SyscallNodeSpec. +func NewSyscallNodeCodec(functions map[string]func(ctx context.Context, arguments []any) ([]any, error)) scheme.Codec { + if functions == nil { + functions = make(map[string]func(ctx context.Context, arguments []any) ([]any, error)) + } - origins := map[uuid.UUID]T{} - for _, v := range exists { - origins[v.GetID()] = v + return scheme.CodecWithType[*SyscallNodeSpec](func(spec *SyscallNodeSpec) (node.Node, error) { + fn, ok := functions[spec.OPCode] + if !ok { + return nil, errors.WithStack(ErrInvalidOperation) } - for i := 0; i < len(resources); i++ { - patch := resources[i] - origin, ok := origins[patch.GetID()] - if !ok { - resources = append(resources[:i], resources[i+1:]...) - i-- - continue - } + return NewSyscallNode(fn) + }) +} - json1, err := json.Marshal(patch) - if err != nil { - return nil, err - } - json2, err := json.Marshal(origin) - if err != nil { - return nil, err - } +// NewSyscallNode creates a new SyscallNode from a function. +func NewSyscallNode(fn func(context.Context, []any) ([]any, error)) (*SyscallNode, error) { + n := &SyscallNode{fn: fn} + n.OneToOneNode = node.NewOneToOneNode(n.action) + return n, nil +} - merge, err := jsonpatch.MergePatch(json1, json2) - if err != nil { - return nil, err - } +func (n *SyscallNode) action(proc *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { + ctx := proc.Context() + inPayload := inPck.Payload() - if err := json.Unmarshal(merge, &resources[i]); err != nil { - return nil, err - } - } + var arguments []any + if v, ok := inPayload.(types.Slice); ok { + arguments = v.Slice() + } else { + arguments = append(arguments, types.InterfaceOf(inPayload)) + } - if _, err := store.Swap(ctx, resources...); err != nil { - return nil, err - } - return resources, nil + returns, err := n.fn(ctx, arguments) + if err != nil { + return nil, packet.New(types.NewError(err)) } -} -// DeleteResource is a generic function to load and delete resources. -func DeleteResource[T resource.Resource](store resource.Store[T]) func(context.Context, []T) ([]T, error) { - return func(ctx context.Context, resources []T) ([]T, error) { - exists, err := store.Load(ctx, resources...) + outPayloads := make([]types.Value, len(returns)) + for i, out := range returns { + outPayload, err := types.Marshal(out) if err != nil { - return nil, err - } - if len(exists) > 0 { - if _, err := store.Delete(ctx, exists...); err != nil { - return nil, err - } + return nil, packet.New(types.NewError(err)) } - return exists, nil + outPayloads[i] = outPayload + } + + if len(outPayloads) == 0 { + return packet.New(nil), nil + } + if len(outPayloads) == 1 { + return packet.New(outPayloads[0]), nil } + return packet.New(types.NewSlice(outPayloads...)), nil } diff --git a/ext/pkg/system/syscall_test.go b/ext/pkg/system/syscall_test.go index 40ab9c6b..f2369a93 100644 --- a/ext/pkg/system/syscall_test.go +++ b/ext/pkg/system/syscall_test.go @@ -6,95 +6,91 @@ import ( "time" "github.com/go-faker/faker/v4" - "github.com/gofrs/uuid" - "github.com/siyul-park/uniflow/pkg/resource" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/types" "github.com/stretchr/testify/assert" ) -func TestWatchResource(t *testing.T) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() +func TestSyscallNodeCodec_Compile(t *testing.T) { + opcode := faker.UUIDHyphenated() - st := resource.NewStore[resource.Resource]() - fn := WatchResource(st) + codec := NewSyscallNodeCodec(map[string]func(ctx context.Context, arguments []any) ([]any, error){ + opcode: func(ctx context.Context, arguments []any) ([]any, error) { + return nil, nil + }, + }) + + spec := &SyscallNodeSpec{ + OPCode: opcode, + } - _, err := fn(ctx) + n, err := codec.Compile(spec) assert.NoError(t, err) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) } -func TestCreateResource(t *testing.T) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() - - st := resource.NewStore[resource.Resource]() - fn := CreateResource(st) - - meta := &resource.Meta{ - ID: uuid.Must(uuid.NewV7()), - Name: faker.Word(), - } - - res, err := fn(ctx, []resource.Resource{meta}) +func TestNewSyscallNode(t *testing.T) { + n, err := NewSyscallNode(nil) assert.NoError(t, err) - assert.Len(t, res, 1) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) } -func TestReadResource(t *testing.T) { +func TestSyscallNode_SendAndReceive(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() - st := resource.NewStore[resource.Resource]() - fn := ReadResource(st) + n, _ := NewSyscallNode(func(ctx context.Context, arguments []any) ([]any, error) { + return arguments, nil + }) + defer n.Close() - meta := &resource.Meta{ - ID: uuid.Must(uuid.NewV7()), - Name: faker.Word(), - } + in := port.NewOut() + in.Link(n.In(node.PortIn)) - _, err := st.Store(ctx, meta) - assert.NoError(t, err) + proc := process.New() + defer proc.Exit(nil) - res, err := fn(ctx, []resource.Resource{meta}) - assert.NoError(t, err) - assert.Len(t, res, 1) -} + inWriter := in.Open(proc) -func TestUpdateResource(t *testing.T) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() + inPayload := types.NewString(faker.UUIDHyphenated()) + inPck := packet.New(inPayload) - st := resource.NewStore[resource.Resource]() - fn := UpdateResource(st) + inWriter.Write(inPck) - meta := &resource.Meta{ - ID: uuid.Must(uuid.NewV7()), - Name: faker.Word(), + select { + case outPck := <-inWriter.Receive(): + assert.Equal(t, inPayload, outPck.Payload()) + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) } +} - _, err := st.Store(ctx, meta) - assert.NoError(t, err) +func BenchmarkSyscallNode_SendAndReceive(b *testing.B) { + n, _ := NewSyscallNode(func(ctx context.Context, arguments []any) ([]any, error) { + return arguments, nil + }) + defer n.Close() - res, err := fn(ctx, []resource.Resource{meta}) - assert.NoError(t, err) - assert.Len(t, res, 1) -} + in := port.NewOut() + in.Link(n.In(node.PortIn)) -func TestDeleteResource(t *testing.T) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() + proc := process.New() + defer proc.Exit(nil) - st := resource.NewStore[resource.Resource]() - fn := DeleteResource(st) + inWriter := in.Open(proc) - meta := &resource.Meta{ - ID: uuid.Must(uuid.NewV7()), - Name: faker.Word(), - } + inPayload := types.NewString(faker.UUIDHyphenated()) + inPck := packet.New(inPayload) - _, err := st.Store(ctx, meta) - assert.NoError(t, err) + b.ResetTimer() - res, err := fn(ctx, []resource.Resource{meta}) - assert.NoError(t, err) - assert.Len(t, res, 1) + for i := 0; i < b.N; i++ { + inWriter.Write(inPck) + <-inWriter.Receive() + } }