From 4c39a70254bfb4cf60ec3cf8939fba4affb79af1 Mon Sep 17 00:00:00 2001 From: siyual-park Date: Mon, 18 Nov 2024 20:35:25 +0900 Subject: [PATCH] feat: support default spec --- README.md | 4 - README_kr.md | 4 - cmd/pkg/uniflow/main.go | 25 +++- docs/architecture.md | 4 - docs/architecture_kr.md | 4 - examples/httpproxy.yaml | 4 - examples/ping.yaml | 4 - examples/system.yaml | 4 - examples/wsproxy.yaml | 4 - ext/pkg/control/block.go | 22 ++-- ext/pkg/control/block_test.go | 4 +- ext/pkg/control/builder.go | 24 +++- ext/pkg/io/builder.go | 17 ++- ext/pkg/network/builder.go | 16 ++- ext/pkg/system/builder.go | 15 ++- pkg/chart/chart.go | 9 +- pkg/chart/linker.go | 6 + pkg/process/local.go | 5 +- pkg/runtime/runtime.go | 2 +- pkg/scheme/scheme.go | 230 +++++++++++++++++++++++++++++++--- pkg/scheme/scheme_test.go | 116 +++++++++++++++++ pkg/spec/spec.go | 89 ------------- pkg/spec/spec_test.go | 51 -------- pkg/symbol/loader.go | 8 +- 24 files changed, 447 insertions(+), 224 deletions(-) diff --git a/README.md b/README.md index 186cd593..3e8d7b8e 100644 --- a/README.md +++ b/README.md @@ -45,14 +45,10 @@ Try a basic HTTP request handler using [ping.yaml](./examples/ping.yaml): - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/README_kr.md b/README_kr.md index 1fabeb30..f831134a 100644 --- a/README_kr.md +++ b/README_kr.md @@ -45,14 +45,10 @@ make build - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/cmd/pkg/uniflow/main.go b/cmd/pkg/uniflow/main.go index 5578dd62..79eb3f35 100644 --- a/cmd/pkg/uniflow/main.go +++ b/cmd/pkg/uniflow/main.go @@ -121,6 +121,23 @@ func main() { secretStore = secret.NewStore() } + specs := map[string]spec.Spec{ + network.KindListener: &spec.Unstructured{ + Meta: spec.Meta{ + Env: map[string][]spec.Value{ + "PORT": { + { + Data: "{{ .PORT }}", + }, + }, + }, + }, + Fields: map[string]any{ + "port": "{{ .PORT }}", + }, + }, + } + schemeBuilder := scheme.NewBuilder() hookBuilder := hook.NewBuilder() @@ -146,10 +163,10 @@ func main() { nativeTable.Store(opUpdateSecrets, system.UpdateResource(secretStore)) nativeTable.Store(opDeleteSecrets, system.DeleteResource(secretStore)) - schemeBuilder.Register(control.AddToScheme(langs, cel.Language)) - schemeBuilder.Register(io.AddToScheme(io.NewOSFileSystem())) - schemeBuilder.Register(network.AddToScheme()) - schemeBuilder.Register(system.AddToScheme(nativeTable)) + schemeBuilder.Register(control.AddToScheme(langs, cel.Language, specs)) + schemeBuilder.Register(io.AddToScheme(io.NewOSFileSystem(), specs)) + schemeBuilder.Register(network.AddToScheme(specs)) + schemeBuilder.Register(system.AddToScheme(nativeTable, specs)) hookBuilder.Register(control.AddToHook()) hookBuilder.Register(network.AddToHook()) diff --git a/docs/architecture.md b/docs/architecture.md index 53807f20..83d3cc2f 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -38,7 +38,6 @@ Users can update node specifications by using a Command-Line Interface (CLI) or - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -46,9 +45,6 @@ Users can update node specifications by using a Command-Line Interface (CLI) or error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/docs/architecture_kr.md b/docs/architecture_kr.md index 8821c9ca..e985296a 100644 --- a/docs/architecture_kr.md +++ b/docs/architecture_kr.md @@ -36,7 +36,6 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -44,9 +43,6 @@ error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/httpproxy.yaml b/examples/httpproxy.yaml index ce24c70f..584e7f93 100644 --- a/examples/httpproxy.yaml +++ b/examples/httpproxy.yaml @@ -1,14 +1,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: proxy port: in - env: - PORT: - data: '{{ .PORT }}' - kind: proxy name: proxy diff --git a/examples/ping.yaml b/examples/ping.yaml index 506ddb35..36e1ae81 100644 --- a/examples/ping.yaml +++ b/examples/ping.yaml @@ -1,14 +1,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/system.yaml b/examples/system.yaml index 36599633..f72df155 100644 --- a/examples/system.yaml +++ b/examples/system.yaml @@ -1,7 +1,6 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -9,9 +8,6 @@ error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/wsproxy.yaml b/examples/wsproxy.yaml index 72e0b465..5884f1ef 100644 --- a/examples/wsproxy.yaml +++ b/examples/wsproxy.yaml @@ -10,14 +10,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/ext/pkg/control/block.go b/ext/pkg/control/block.go index 1b8407e9..2bba36b8 100644 --- a/ext/pkg/control/block.go +++ b/ext/pkg/control/block.go @@ -13,7 +13,7 @@ import ( // BlockNodeSpec defines the specification for creating a BlockNode. type BlockNodeSpec struct { spec.Meta `map:",inline"` - Specs []*spec.Unstructured `map:"specs"` + Specs []spec.Spec `map:"specs"` } // BlockNode systematically manages complex data processing flows and executes multiple sub-nodes sequentially. @@ -32,17 +32,15 @@ var _ node.Node = (*BlockNode)(nil) // NewBlockNodeCodec creates a new codec for BlockNodeSpec. func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec { - return scheme.CodecWithType(func(spec *BlockNodeSpec) (node.Node, error) { - symbols := make([]*symbol.Symbol, 0, len(spec.Specs)) - for _, sp := range spec.Specs { - decoded, err := s.Decode(sp) - if err != nil { - for _, n := range symbols { - n.Close() - } - return nil, err + return scheme.CodecWithType(func(sp *BlockNodeSpec) (node.Node, error) { + symbols := make([]*symbol.Symbol, 0, len(sp.Specs)) + for _, sp := range sp.Specs { + sp, err := s.Build(sp) + for _, n := range symbols { + n.Close() } - n, err := s.Compile(decoded) + + n, err := s.Compile(sp) if err != nil { for _, n := range symbols { n.Close() @@ -50,7 +48,7 @@ func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec { return nil, err } symbols = append(symbols, &symbol.Symbol{ - Spec: decoded, + Spec: sp, Node: n, }) } diff --git a/ext/pkg/control/block_test.go b/ext/pkg/control/block_test.go index 6cca679c..6ee9b071 100644 --- a/ext/pkg/control/block_test.go +++ b/ext/pkg/control/block_test.go @@ -31,8 +31,8 @@ func TestBlockNodeCodec_Decode(t *testing.T) { codec := NewBlockNodeCodec(s) spec := &BlockNodeSpec{ - Specs: []*spec.Unstructured{ - { + Specs: []spec.Spec{ + &spec.Unstructured{ Meta: spec.Meta{ ID: uuid.Must(uuid.NewV7()), Kind: kind, diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index 91cb9298..1733f791 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -4,6 +4,7 @@ import ( "github.com/siyul-park/uniflow/ext/pkg/language" "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" "github.com/siyul-park/uniflow/pkg/symbol" ) @@ -33,7 +34,14 @@ func AddToHook() hook.Register { } // AddToScheme returns a function that adds node types and codecs to the provided spec. -func AddToScheme(module *language.Module, lang string) scheme.Register { +func AddToScheme(module *language.Module, lang string, specs ...map[string]spec.Spec) scheme.Register { + value := map[string]spec.Spec{} + for _, val := range specs { + for v, k := range val { + value[v] = k + } + } + return scheme.RegisterFunc(func(s *scheme.Scheme) error { expr, err := module.Load(lang) if err != nil { @@ -41,45 +49,59 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { } s.AddKnownType(KindBlock, &BlockNodeSpec{}) + s.AddKnownValue(KindBlock, value[KindBlock]) s.AddCodec(KindBlock, NewBlockNodeCodec(s)) s.AddKnownType(KindPipe, &PipeNodeSpec{}) + s.AddKnownValue(KindPipe, value[KindPipe]) s.AddCodec(KindPipe, NewPipeNodeCodec()) s.AddKnownType(KindFork, &ForkNodeSpec{}) + s.AddKnownValue(KindFork, value[KindFork]) s.AddCodec(KindFork, NewForkNodeCodec()) s.AddKnownType(KindIf, &IfNodeSpec{}) + s.AddKnownValue(KindIf, value[KindIf]) s.AddCodec(KindIf, NewIfNodeCodec(expr)) s.AddKnownType(KindLoop, &LoopNodeSpec{}) + s.AddKnownValue(KindLoop, value[KindLoop]) s.AddCodec(KindLoop, NewLoopNodeCodec()) s.AddKnownType(KindMerge, &MergeNodeSpec{}) + s.AddKnownValue(KindMerge, value[KindMerge]) s.AddCodec(KindMerge, NewMergeNodeCodec()) s.AddKnownType(KindNOP, &NOPNodeSpec{}) + s.AddKnownValue(KindNOP, value[KindNOP]) s.AddCodec(KindNOP, NewNOPNodeCodec()) s.AddKnownType(KindReduce, &ReduceNodeSpec{}) + s.AddKnownValue(KindReduce, value[KindReduce]) s.AddCodec(KindReduce, NewReduceNodeCodec(expr)) s.AddKnownType(KindRetry, &RetryNodeSpec{}) + s.AddKnownValue(KindRetry, value[KindRetry]) s.AddCodec(KindRetry, NewRetryNodeCodec()) s.AddKnownType(KindSession, &SessionNodeSpec{}) + s.AddKnownValue(KindSession, value[KindSession]) s.AddCodec(KindSession, NewSessionNodeCodec()) s.AddKnownType(KindSnippet, &SnippetNodeSpec{}) + s.AddKnownValue(KindSnippet, value[KindSnippet]) s.AddCodec(KindSnippet, NewSnippetNodeCodec(module)) s.AddKnownType(KindSplit, &SplitNodeSpec{}) + s.AddKnownValue(KindSplit, value[KindSplit]) s.AddCodec(KindSplit, NewSplitNodeCodec()) s.AddKnownType(KindSwitch, &SwitchNodeSpec{}) + s.AddKnownValue(KindSwitch, value[KindSwitch]) s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr)) s.AddKnownType(KindWait, &WaitNodeSpec{}) + s.AddKnownValue(KindWait, value[KindWait]) s.AddCodec(KindWait, NewWaitNodeCodec()) return nil diff --git a/ext/pkg/io/builder.go b/ext/pkg/io/builder.go index fe20d9c7..3b55e2dc 100644 --- a/ext/pkg/io/builder.go +++ b/ext/pkg/io/builder.go @@ -1,17 +1,30 @@ package io -import "github.com/siyul-park/uniflow/pkg/scheme" +import ( + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" +) // AddToScheme returns a function that adds node types and codecs to the provided spec. -func AddToScheme(fs FileSystem) scheme.Register { +func AddToScheme(fs FileSystem, specs ...map[string]spec.Spec) scheme.Register { + value := map[string]spec.Spec{} + for _, val := range specs { + for v, k := range val { + value[v] = k + } + } + return scheme.RegisterFunc(func(s *scheme.Scheme) error { s.AddKnownType(KindSQL, &SQLNodeSpec{}) + s.AddKnownValue(KindSQL, value[KindSQL]) s.AddCodec(KindSQL, NewSQLNodeCodec()) s.AddKnownType(KindPrint, &PrintNodeSpec{}) + s.AddKnownValue(KindPrint, value[KindPrint]) s.AddCodec(KindPrint, NewPrintNodeCodec(fs)) s.AddKnownType(KindScan, &ScanNodeSpec{}) + s.AddKnownValue(KindScan, value[KindScan]) s.AddCodec(KindScan, NewScanNodeCodec(fs)) return nil diff --git a/ext/pkg/network/builder.go b/ext/pkg/network/builder.go index 1e0c9414..7ffee876 100644 --- a/ext/pkg/network/builder.go +++ b/ext/pkg/network/builder.go @@ -3,6 +3,7 @@ package network import ( "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" "github.com/siyul-park/uniflow/pkg/symbol" ) @@ -28,24 +29,37 @@ func AddToHook() hook.Register { } // AddToScheme returns a function that adds node types and codecs to the provided spec. -func AddToScheme() scheme.Register { +func AddToScheme(specs ...map[string]spec.Spec) scheme.Register { + value := map[string]spec.Spec{} + for _, val := range specs { + for v, k := range val { + value[v] = k + } + } + return scheme.RegisterFunc(func(s *scheme.Scheme) error { s.AddKnownType(KindHTTP, &HTTPNodeSpec{}) + s.AddKnownValue(KindHTTP, value[KindHTTP]) s.AddCodec(KindHTTP, NewHTTPNodeCodec()) s.AddKnownType(KindListener, &ListenNodeSpec{}) + s.AddKnownValue(KindListener, value[KindListener]) s.AddCodec(KindListener, NewListenNodeCodec()) s.AddKnownType(KindProxy, &ProxyNodeSpec{}) + s.AddKnownValue(KindProxy, value[KindProxy]) s.AddCodec(KindProxy, NewProxyNodeCodec()) s.AddKnownType(KindRouter, &RouteNodeSpec{}) + s.AddKnownValue(KindRouter, value[KindRouter]) s.AddCodec(KindRouter, NewRouteNodeCodec()) s.AddKnownType(KindWebSocket, &WebSocketNodeSpec{}) + s.AddKnownValue(KindWebSocket, value[KindWebSocket]) s.AddCodec(KindWebSocket, NewWebSocketNodeCodec()) s.AddKnownType(KindGateway, &GatewayNodeSpec{}) + s.AddKnownValue(KindGateway, value[KindGateway]) s.AddCodec(KindGateway, NewGatewayNodeCodec()) return nil diff --git a/ext/pkg/system/builder.go b/ext/pkg/system/builder.go index 15415e22..9a8cb8e9 100644 --- a/ext/pkg/system/builder.go +++ b/ext/pkg/system/builder.go @@ -1,11 +1,22 @@ package system -import "github.com/siyul-park/uniflow/pkg/scheme" +import ( + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" +) // AddToScheme returns a function that adds node types and codecs to the provided spec. -func AddToScheme(table *NativeTable) scheme.Register { +func AddToScheme(table *NativeTable, specs ...map[string]spec.Spec) scheme.Register { + value := map[string]spec.Spec{} + for _, val := range specs { + for v, k := range val { + value[v] = k + } + } + return scheme.RegisterFunc(func(s *scheme.Scheme) error { s.AddKnownType(KindNative, &NativeNodeSpec{}) + s.AddKnownValue(KindNative, value[KindNative]) s.AddCodec(KindNative, NewNativeNodeCodec(table)) return nil diff --git a/pkg/chart/chart.go b/pkg/chart/chart.go index 2a93741e..1047756b 100644 --- a/pkg/chart/chart.go +++ b/pkg/chart/chart.go @@ -162,14 +162,11 @@ func (c *Chart) Build(sp spec.Spec) ([]spec.Spec, error) { return nil, err } - unstructured.SetEnv(env) - - bind, err := spec.Bind(unstructured) - if err != nil { - return nil, err + if len(env) > 0 { + unstructured.SetEnv(env) } - specs = append(specs, bind) + specs = append(specs, unstructured) } return specs, nil } diff --git a/pkg/chart/linker.go b/pkg/chart/linker.go index 819ca0aa..85fc4b00 100644 --- a/pkg/chart/linker.go +++ b/pkg/chart/linker.go @@ -58,6 +58,12 @@ func (l *Linker) Link(chrt *Chart) error { symbols := make([]*symbol.Symbol, 0, len(specs)) for _, sp := range specs { + if build, err := l.scheme.Build(sp); err != nil { + return nil, err + } else { + sp = build + } + n, err := l.scheme.Compile(sp) if err != nil { for _, sb := range symbols { diff --git a/pkg/process/local.go b/pkg/process/local.go index bce6a323..186b0306 100644 --- a/pkg/process/local.go +++ b/pkg/process/local.go @@ -22,10 +22,11 @@ func (l *Local[T]) AddStoreHook(proc *Process, hook StoreHook[T]) bool { l.mu.Lock() defer l.mu.Unlock() - if _, ok := l.data[proc]; ok { + if val, ok := l.data[proc]; ok { l.mu.Unlock() defer l.mu.Lock() - hook.Store(l.data[proc]) + + hook.Store(val) return true } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index cf91e907..8a2890a5 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -212,7 +212,7 @@ func (r *Runtime) Reconcile(ctx context.Context) error { var specs []spec.Spec for _, id := range r.symbolTable.Keys() { sb := r.symbolTable.Lookup(id) - if sb != nil && spec.IsBound(sb.Spec, secrets...) { + if sb != nil && r.scheme.IsBound(sb.Spec, secrets...) { specs = append(specs, sb.Spec) } } diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 3233f7c5..6d1f573a 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -2,6 +2,9 @@ package scheme import ( "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/resource" + "github.com/siyul-park/uniflow/pkg/secret" + "github.com/siyul-park/uniflow/pkg/template" "reflect" "slices" "sync" @@ -16,6 +19,7 @@ import ( // Scheme manages type information and decodes spec implementations into node objects within a workflow environment. type Scheme struct { types map[string]reflect.Type + values map[string]reflect.Value codecs map[string]Codec mu sync.RWMutex } @@ -26,6 +30,7 @@ var _ Codec = (*Scheme)(nil) func New() *Scheme { return &Scheme{ types: make(map[string]reflect.Type), + values: make(map[string]reflect.Value), codecs: make(map[string]Codec), } } @@ -52,6 +57,9 @@ func (s *Scheme) AddKnownType(kind string, sp spec.Spec) bool { s.mu.Lock() defer s.mu.Unlock() + if sp == nil { + return false + } if _, ok := s.types[kind]; ok { return false } @@ -79,6 +87,56 @@ func (s *Scheme) KnownType(kind string) reflect.Type { return s.types[kind] } +// AddKnownValue associates a default value with a kind and returns true if successful. +func (s *Scheme) AddKnownValue(kind string, sp spec.Spec) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if sp == nil { + return false + } + if _, ok := s.values[kind]; ok { + return false + } + s.values[kind] = reflect.ValueOf(sp) + return true +} + +// KnownValue retrieves a structured spec based on the default value associated with the kind of the input spec. +func (s *Scheme) KnownValue(sp spec.Spec) (spec.Spec, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + val, ok := s.values[sp.GetKind()] + if !ok { + return sp, nil + } + + structured, ok := s.clone(val).Interface().(spec.Spec) + if !ok { + return sp, nil + } + + if doc, err := types.Marshal(sp); err != nil { + return nil, err + } else if err := types.Unmarshal(doc, structured); err != nil { + return nil, err + } + return structured, nil +} + +// RemoveKnownValue removes the default value associated with a kind. +func (s *Scheme) RemoveKnownValue(kind string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.values[kind]; ok { + delete(s.values, kind) + return true + } + return false +} + // AddCodec associates a codec with a specific kind and returns true if successful. func (s *Scheme) AddCodec(kind string, codec Codec) bool { s.mu.Lock() @@ -111,28 +169,108 @@ func (s *Scheme) Codec(kind string) Codec { return s.codecs[kind] } -// Compile decodes the given spec into node using the associated codec. -func (s *Scheme) Compile(sp spec.Spec) (node.Node, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - codec := s.Codec(sp.GetKind()) - if codec == nil { - return nil, errors.WithStack(encoding.ErrUnsupportedType) +// Build resolves the spec's default values, binds secrets, and decodes the spec, returning the final processed spec or an error. +func (s *Scheme) Build(sp spec.Spec, secrets ...*secret.Secret) (spec.Spec, error) { + sp, err := s.KnownValue(sp) + if err != nil { + return nil, err } - return codec.Compile(sp) + sp, err = s.Bind(sp, secrets...) + if err != nil { + return nil, err + } + return s.Decode(sp) } -// Decode converts the provided spec.Spec into a structured representation using reflection and encoding utilities. -func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { - s.mu.RLock() - defer s.mu.RUnlock() +// IsBound checks if the spec is bound to any of the provided secrets. +func (s *Scheme) IsBound(sp spec.Spec, secrets ...*secret.Secret) bool { + for _, values := range sp.GetEnv() { + for _, val := range values { + examples := make([]*secret.Secret, 0, 2) + if val.ID != uuid.Nil { + examples = append(examples, &secret.Secret{ID: val.ID}) + } + if val.Name != "" { + examples = append(examples, &secret.Secret{Namespace: sp.GetNamespace(), Name: val.Name}) + } + for _, scrt := range secrets { + if len(resource.Match(scrt, examples...)) > 0 { + return true + } + } + } + } + return false +} + +// Bind processes the environment variables in the spec using the provided secrets. +func (s *Scheme) Bind(sp spec.Spec, secrets ...*secret.Secret) (spec.Spec, error) { doc, err := types.Marshal(sp) if err != nil { return nil, err } + unstructured := &spec.Unstructured{} + if err := types.Unmarshal(doc, unstructured); err != nil { + return nil, err + } + + env := map[string]any{} + for key, values := range unstructured.GetEnv() { + for i, val := range values { + example := &secret.Secret{ + ID: val.ID, + Namespace: unstructured.GetNamespace(), + Name: val.Name, + } + + var scrt *secret.Secret + for _, s := range secrets { + if (!s.IsIdentified() && !val.IsIdentified()) || len(resource.Match(s, example)) > 0 { + scrt = s + break + } + } + + if scrt != nil { + v, err := template.Execute(val.Data, scrt.Data) + if err != nil { + return nil, err + } + + val.ID = scrt.GetID() + val.Name = scrt.GetName() + val.Data = v + values[i] = val + } + + if !val.IsIdentified() || scrt != nil { + env[key] = val.Data + } + } + + if _, ok := env[key]; !ok { + return nil, errors.WithStack(encoding.ErrUnsupportedValue) + } + } + + if len(env) > 0 { + fields, err := template.Execute(unstructured.Fields, env) + if err != nil { + return nil, err + } + unstructured.Fields = fields.(map[string]any) + } + + return unstructured, nil +} + +// Decode converts the provided spec.Spec into a structured representation using reflection and encoding utilities. +func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { + s.mu.RLock() + defer s.mu.RUnlock() + typ, ok := s.types[sp.GetKind()] if !ok { return sp, nil @@ -148,7 +286,9 @@ func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { return sp, nil } - if err := types.Unmarshal(doc, structured); err != nil { + if doc, err := types.Marshal(sp); err != nil { + return nil, err + } else if err := types.Unmarshal(doc, structured); err != nil { return nil, err } @@ -157,3 +297,65 @@ func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { } return structured, nil } + +// Compile decodes the given spec into a node using the associated codec. +func (s *Scheme) Compile(sp spec.Spec) (node.Node, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + codec := s.Codec(sp.GetKind()) + if codec == nil { + return nil, errors.WithStack(encoding.ErrUnsupportedType) + } + return codec.Compile(sp) +} + +func (s *Scheme) clone(value reflect.Value) reflect.Value { + if !value.IsValid() { + return reflect.Zero(value.Type()) + } + + switch value.Kind() { + case reflect.Pointer: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + ptr := reflect.New(value.Type().Elem()) + ptr.Elem().Set(s.clone(value.Elem())) + return ptr + case reflect.Struct: + clone := reflect.New(value.Type()).Elem() + for i := 0; i < value.NumField(); i++ { + if value.Field(i).CanSet() { + clone.Field(i).Set(s.clone(value.Field(i))) + } + } + return clone + case reflect.Slice: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + clone := reflect.MakeSlice(value.Type(), value.Len(), value.Cap()) + for i := 0; i < value.Len(); i++ { + clone.Index(i).Set(s.clone(value.Index(i))) + } + return clone + case reflect.Map: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + clone := reflect.MakeMapWithSize(value.Type(), value.Len()) + for _, key := range value.MapKeys() { + clone.SetMapIndex(key, s.clone(value.MapIndex(key))) + } + return clone + case reflect.Array: + clone := reflect.New(value.Type()).Elem() + for i := 0; i < value.Len(); i++ { + clone.Index(i).Set(s.clone(value.Index(i))) + } + return clone + default: + return reflect.ValueOf(value.Interface()) + } +} diff --git a/pkg/scheme/scheme_test.go b/pkg/scheme/scheme_test.go index edc4460e..200f4d99 100644 --- a/pkg/scheme/scheme_test.go +++ b/pkg/scheme/scheme_test.go @@ -1,6 +1,7 @@ package scheme import ( + "github.com/siyul-park/uniflow/pkg/secret" "testing" "github.com/go-faker/faker/v4" @@ -44,6 +45,31 @@ func TestScheme_KnownType(t *testing.T) { assert.False(t, ok) } +func TestScheme_AddKnownValue(t *testing.T) { + s := New() + kind := faker.UUIDHyphenated() + + meta := &spec.Meta{ + Kind: kind, + ID: uuid.Must(uuid.NewV7()), + } + ok := s.AddKnownValue(kind, meta) + assert.True(t, ok) + + ok = s.AddKnownValue(kind, meta) + assert.False(t, ok) + + val, err := s.KnownValue(&spec.Meta{Kind: kind}) + assert.NoError(t, err) + assert.Equal(t, meta, val) + + ok = s.RemoveKnownValue(kind) + assert.True(t, ok) + + ok = s.RemoveKnownValue(kind) + assert.False(t, ok) +} + func TestScheme_Codec(t *testing.T) { s := New() kind := faker.UUIDHyphenated() @@ -67,6 +93,96 @@ func TestScheme_Codec(t *testing.T) { assert.False(t, ok) } +func TestScheme_Build(t *testing.T) { + s := New() + kind := faker.UUIDHyphenated() + + scrt := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + Data: "foo", + } + + meta := &spec.Meta{ + Kind: kind, + Env: map[string][]spec.Value{ + "FOO": { + { + ID: scrt.ID, + Data: "{{ . }}", + }, + }, + }, + } + + s.AddKnownType(kind, meta) + s.AddKnownValue(kind, meta) + s.AddCodec(kind, CodecFunc(func(spec spec.Spec) (node.Node, error) { + return node.NewOneToOneNode(nil), nil + })) + + build, err := s.Build(&spec.Meta{Kind: kind}, scrt) + assert.NoError(t, err) + assert.Equal(t, "foo", build.GetEnv()["FOO"][0].Data) +} + +func TestScheme_IsBound(t *testing.T) { + s := New() + + sec1 := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + } + sec2 := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + } + + meta := &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.UUIDHyphenated(), + Env: map[string][]spec.Value{ + "FOO": { + { + ID: sec1.ID, + Data: "foo", + }, + }, + }, + } + + assert.True(t, s.IsBound(meta, sec1)) + assert.False(t, s.IsBound(meta, sec2)) +} + +func TestScheme_Bind(t *testing.T) { + s := New() + kind := faker.UUIDHyphenated() + + s.AddKnownType(kind, &spec.Meta{}) + s.AddCodec(kind, CodecFunc(func(spec spec.Spec) (node.Node, error) { + return node.NewOneToOneNode(nil), nil + })) + + scrt := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + Data: "foo", + } + + meta := &spec.Meta{ + Kind: kind, + Env: map[string][]spec.Value{ + "FOO": { + { + ID: scrt.ID, + Data: "{{ . }}", + }, + }, + }, + } + + bind, err := s.Bind(meta, scrt) + assert.NoError(t, err) + assert.Equal(t, "foo", bind.GetEnv()["FOO"][0].Data) +} + func TestScheme_Decode(t *testing.T) { s := New() kind := faker.UUIDHyphenated() diff --git a/pkg/spec/spec.go b/pkg/spec/spec.go index ae38591a..98d5fae9 100644 --- a/pkg/spec/spec.go +++ b/pkg/spec/spec.go @@ -2,12 +2,7 @@ package spec import ( "github.com/gofrs/uuid" - "github.com/pkg/errors" - "github.com/siyul-park/uniflow/pkg/encoding" "github.com/siyul-park/uniflow/pkg/resource" - "github.com/siyul-park/uniflow/pkg/secret" - "github.com/siyul-park/uniflow/pkg/template" - "github.com/siyul-park/uniflow/pkg/types" ) // Spec defines the behavior and connections of each node. @@ -88,90 +83,6 @@ func New() Spec { return &Meta{} } -// IsBound checks if the spec is bound to any of the provided secrets. -func IsBound(sp Spec, secrets ...*secret.Secret) bool { - for _, vals := range sp.GetEnv() { - for _, val := range vals { - examples := make([]*secret.Secret, 0, 2) - if val.ID != uuid.Nil { - examples = append(examples, &secret.Secret{ID: val.ID}) - } - if val.Name != "" { - examples = append(examples, &secret.Secret{Namespace: sp.GetNamespace(), Name: val.Name}) - } - - for _, scrt := range secrets { - if len(resource.Match(scrt, examples...)) > 0 { - return true - } - } - } - } - return false -} - -// Bind processes the environment variables in the spec using the provided secrets. -func Bind(sp Spec, secrets ...*secret.Secret) (Spec, error) { - doc, err := types.Marshal(sp) - if err != nil { - return nil, err - } - - unstructured := &Unstructured{} - if err := types.Unmarshal(doc, unstructured); err != nil { - return nil, err - } - - env := map[string]any{} - for key, vals := range unstructured.GetEnv() { - for i, val := range vals { - example := &secret.Secret{ - ID: val.ID, - Namespace: unstructured.GetNamespace(), - Name: val.Name, - } - - var scrt *secret.Secret - for _, s := range secrets { - if (!s.IsIdentified() && !val.IsIdentified()) || len(resource.Match(s, example)) > 0 { - scrt = s - break - } - } - - if scrt != nil { - v, err := template.Execute(val.Data, scrt.Data) - if err != nil { - return nil, err - } - - val.ID = scrt.GetID() - val.Name = scrt.GetName() - val.Data = v - vals[i] = val - } - - if !val.IsIdentified() || scrt != nil { - env[key] = val.Data - } - } - - if _, ok := env[key]; !ok { - return nil, errors.WithStack(encoding.ErrUnsupportedValue) - } - } - - if len(env) > 0 { - fields, err := template.Execute(unstructured.Fields, env) - if err != nil { - return nil, err - } - unstructured.Fields = fields.(map[string]any) - } - - return unstructured, nil -} - // GetID returns the node's unique identifier. func (m *Meta) GetID() uuid.UUID { return m.ID diff --git a/pkg/spec/spec_test.go b/pkg/spec/spec_test.go index 9f668b04..d71111c4 100644 --- a/pkg/spec/spec_test.go +++ b/pkg/spec/spec_test.go @@ -5,60 +5,9 @@ import ( "github.com/go-faker/faker/v4" "github.com/gofrs/uuid" - "github.com/siyul-park/uniflow/pkg/secret" "github.com/stretchr/testify/assert" ) -func TestIsBound(t *testing.T) { - sec1 := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - } - sec2 := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - } - - meta := &Meta{ - ID: uuid.Must(uuid.NewV7()), - Kind: faker.UUIDHyphenated(), - Env: map[string][]Value{ - "FOO": { - { - ID: sec1.ID, - Data: "foo", - }, - }, - }, - } - - assert.True(t, IsBound(meta, sec1)) - assert.False(t, IsBound(meta, sec2)) -} - -func TestBind(t *testing.T) { - scrt := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - Data: "foo", - } - - meta := &Meta{ - ID: uuid.Must(uuid.NewV7()), - Kind: faker.UUIDHyphenated(), - Env: map[string][]Value{ - "FOO": { - { - ID: scrt.ID, - Data: "{{ . }}", - }, - }, - }, - } - - bind, err := Bind(meta, scrt) - assert.NoError(t, err) - assert.Equal(t, "foo", bind.GetEnv()["FOO"][0].Data) - assert.True(t, IsBound(bind, scrt)) -} - func TestMeta_Get(t *testing.T) { meta := &Meta{ ID: uuid.Must(uuid.NewV7()), diff --git a/pkg/symbol/loader.go b/pkg/symbol/loader.go index e57c8472..e310e8e5 100644 --- a/pkg/symbol/loader.go +++ b/pkg/symbol/loader.go @@ -80,12 +80,10 @@ func (l *Loader) Load(ctx context.Context, specs ...spec.Spec) error { var symbols []*Symbol var errs []error for _, sp := range specs { - if bind, err := spec.Bind(sp, secrets...); err != nil { + if build, err := l.scheme.Build(sp, secrets...); err != nil { errs = append(errs, err) - } else if decode, err := l.scheme.Decode(bind); err != nil { - errs = append(errs, err) - } else if decode != nil { - sp = decode + } else { + sp = build } sb := l.table.Lookup(sp.GetID())