Skip to content

Commit

Permalink
feat: support default spec
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 18, 2024
1 parent dec83fc commit 4c39a70
Show file tree
Hide file tree
Showing 24 changed files with 447 additions and 224 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,10 @@ Try a basic HTTP request handler using [ping.yaml](./examples/ping.yaml):
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
4 changes: 0 additions & 4 deletions README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,10 @@ make build
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
25 changes: 21 additions & 4 deletions cmd/pkg/uniflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ func main() {
secretStore = secret.NewStore()
}

specs := map[string]spec.Spec{
network.KindListener: &spec.Unstructured{
Meta: spec.Meta{
Env: map[string][]spec.Value{
"PORT": {
{
Data: "{{ .PORT }}",
},
},
},
},
Fields: map[string]any{
"port": "{{ .PORT }}",
},
},
}

schemeBuilder := scheme.NewBuilder()
hookBuilder := hook.NewBuilder()

Expand All @@ -146,10 +163,10 @@ func main() {
nativeTable.Store(opUpdateSecrets, system.UpdateResource(secretStore))
nativeTable.Store(opDeleteSecrets, system.DeleteResource(secretStore))

schemeBuilder.Register(control.AddToScheme(langs, cel.Language))
schemeBuilder.Register(io.AddToScheme(io.NewOSFileSystem()))
schemeBuilder.Register(network.AddToScheme())
schemeBuilder.Register(system.AddToScheme(nativeTable))
schemeBuilder.Register(control.AddToScheme(langs, cel.Language, specs))
schemeBuilder.Register(io.AddToScheme(io.NewOSFileSystem(), specs))
schemeBuilder.Register(network.AddToScheme(specs))
schemeBuilder.Register(system.AddToScheme(nativeTable, specs))

hookBuilder.Register(control.AddToHook())
hookBuilder.Register(network.AddToHook())
Expand Down
4 changes: 0 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,13 @@ Users can update node specifications by using a Command-Line Interface (CLI) or
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
4 changes: 0 additions & 4 deletions docs/architecture_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,13 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
4 changes: 0 additions & 4 deletions examples/httpproxy.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: proxy
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: proxy
name: proxy
Expand Down
4 changes: 0 additions & 4 deletions examples/ping.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
4 changes: 0 additions & 4 deletions examples/system.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
4 changes: 0 additions & 4 deletions examples/wsproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
22 changes: 10 additions & 12 deletions ext/pkg/control/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// BlockNodeSpec defines the specification for creating a BlockNode.
type BlockNodeSpec struct {
spec.Meta `map:",inline"`
Specs []*spec.Unstructured `map:"specs"`
Specs []spec.Spec `map:"specs"`
}

// BlockNode systematically manages complex data processing flows and executes multiple sub-nodes sequentially.
Expand All @@ -32,25 +32,23 @@ var _ node.Node = (*BlockNode)(nil)

// NewBlockNodeCodec creates a new codec for BlockNodeSpec.
func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec {
return scheme.CodecWithType(func(spec *BlockNodeSpec) (node.Node, error) {
symbols := make([]*symbol.Symbol, 0, len(spec.Specs))
for _, sp := range spec.Specs {
decoded, err := s.Decode(sp)
if err != nil {
for _, n := range symbols {
n.Close()
}
return nil, err
return scheme.CodecWithType(func(sp *BlockNodeSpec) (node.Node, error) {
symbols := make([]*symbol.Symbol, 0, len(sp.Specs))
for _, sp := range sp.Specs {
sp, err := s.Build(sp)
for _, n := range symbols {
n.Close()
}
n, err := s.Compile(decoded)

n, err := s.Compile(sp)
if err != nil {
for _, n := range symbols {
n.Close()
}
return nil, err
}
symbols = append(symbols, &symbol.Symbol{
Spec: decoded,
Spec: sp,
Node: n,
})
}
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/control/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestBlockNodeCodec_Decode(t *testing.T) {
codec := NewBlockNodeCodec(s)

spec := &BlockNodeSpec{
Specs: []*spec.Unstructured{
{
Specs: []spec.Spec{
&spec.Unstructured{
Meta: spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: kind,
Expand Down
24 changes: 23 additions & 1 deletion ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/siyul-park/uniflow/ext/pkg/language"
"github.com/siyul-park/uniflow/pkg/hook"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/symbol"
)

Expand Down Expand Up @@ -33,53 +34,74 @@ func AddToHook() hook.Register {
}

// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme(module *language.Module, lang string) scheme.Register {
func AddToScheme(module *language.Module, lang string, specs ...map[string]spec.Spec) scheme.Register {
value := map[string]spec.Spec{}
for _, val := range specs {
for v, k := range val {
value[v] = k
}
}

return scheme.RegisterFunc(func(s *scheme.Scheme) error {
expr, err := module.Load(lang)
if err != nil {
return err
}

s.AddKnownType(KindBlock, &BlockNodeSpec{})
s.AddKnownValue(KindBlock, value[KindBlock])
s.AddCodec(KindBlock, NewBlockNodeCodec(s))

s.AddKnownType(KindPipe, &PipeNodeSpec{})
s.AddKnownValue(KindPipe, value[KindPipe])
s.AddCodec(KindPipe, NewPipeNodeCodec())

s.AddKnownType(KindFork, &ForkNodeSpec{})
s.AddKnownValue(KindFork, value[KindFork])
s.AddCodec(KindFork, NewForkNodeCodec())

s.AddKnownType(KindIf, &IfNodeSpec{})
s.AddKnownValue(KindIf, value[KindIf])
s.AddCodec(KindIf, NewIfNodeCodec(expr))

s.AddKnownType(KindLoop, &LoopNodeSpec{})
s.AddKnownValue(KindLoop, value[KindLoop])
s.AddCodec(KindLoop, NewLoopNodeCodec())

s.AddKnownType(KindMerge, &MergeNodeSpec{})
s.AddKnownValue(KindMerge, value[KindMerge])
s.AddCodec(KindMerge, NewMergeNodeCodec())

s.AddKnownType(KindNOP, &NOPNodeSpec{})
s.AddKnownValue(KindNOP, value[KindNOP])
s.AddCodec(KindNOP, NewNOPNodeCodec())

s.AddKnownType(KindReduce, &ReduceNodeSpec{})
s.AddKnownValue(KindReduce, value[KindReduce])
s.AddCodec(KindReduce, NewReduceNodeCodec(expr))

s.AddKnownType(KindRetry, &RetryNodeSpec{})
s.AddKnownValue(KindRetry, value[KindRetry])
s.AddCodec(KindRetry, NewRetryNodeCodec())

s.AddKnownType(KindSession, &SessionNodeSpec{})
s.AddKnownValue(KindSession, value[KindSession])
s.AddCodec(KindSession, NewSessionNodeCodec())

s.AddKnownType(KindSnippet, &SnippetNodeSpec{})
s.AddKnownValue(KindSnippet, value[KindSnippet])
s.AddCodec(KindSnippet, NewSnippetNodeCodec(module))

s.AddKnownType(KindSplit, &SplitNodeSpec{})
s.AddKnownValue(KindSplit, value[KindSplit])
s.AddCodec(KindSplit, NewSplitNodeCodec())

s.AddKnownType(KindSwitch, &SwitchNodeSpec{})
s.AddKnownValue(KindSwitch, value[KindSwitch])
s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr))

s.AddKnownType(KindWait, &WaitNodeSpec{})
s.AddKnownValue(KindWait, value[KindWait])
s.AddCodec(KindWait, NewWaitNodeCodec())

return nil
Expand Down
17 changes: 15 additions & 2 deletions ext/pkg/io/builder.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
package io

import "github.com/siyul-park/uniflow/pkg/scheme"
import (
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
)

// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme(fs FileSystem) scheme.Register {
func AddToScheme(fs FileSystem, specs ...map[string]spec.Spec) scheme.Register {
value := map[string]spec.Spec{}
for _, val := range specs {
for v, k := range val {
value[v] = k
}
}

return scheme.RegisterFunc(func(s *scheme.Scheme) error {
s.AddKnownType(KindSQL, &SQLNodeSpec{})
s.AddKnownValue(KindSQL, value[KindSQL])
s.AddCodec(KindSQL, NewSQLNodeCodec())

s.AddKnownType(KindPrint, &PrintNodeSpec{})
s.AddKnownValue(KindPrint, value[KindPrint])
s.AddCodec(KindPrint, NewPrintNodeCodec(fs))

s.AddKnownType(KindScan, &ScanNodeSpec{})
s.AddKnownValue(KindScan, value[KindScan])
s.AddCodec(KindScan, NewScanNodeCodec(fs))

return nil
Expand Down
16 changes: 15 additions & 1 deletion ext/pkg/network/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"github.com/siyul-park/uniflow/pkg/hook"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/symbol"
)

Expand All @@ -28,24 +29,37 @@ func AddToHook() hook.Register {
}

// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme() scheme.Register {
func AddToScheme(specs ...map[string]spec.Spec) scheme.Register {
value := map[string]spec.Spec{}
for _, val := range specs {
for v, k := range val {
value[v] = k
}
}

return scheme.RegisterFunc(func(s *scheme.Scheme) error {
s.AddKnownType(KindHTTP, &HTTPNodeSpec{})
s.AddKnownValue(KindHTTP, value[KindHTTP])
s.AddCodec(KindHTTP, NewHTTPNodeCodec())

s.AddKnownType(KindListener, &ListenNodeSpec{})
s.AddKnownValue(KindListener, value[KindListener])
s.AddCodec(KindListener, NewListenNodeCodec())

s.AddKnownType(KindProxy, &ProxyNodeSpec{})
s.AddKnownValue(KindProxy, value[KindProxy])
s.AddCodec(KindProxy, NewProxyNodeCodec())

s.AddKnownType(KindRouter, &RouteNodeSpec{})
s.AddKnownValue(KindRouter, value[KindRouter])
s.AddCodec(KindRouter, NewRouteNodeCodec())

s.AddKnownType(KindWebSocket, &WebSocketNodeSpec{})
s.AddKnownValue(KindWebSocket, value[KindWebSocket])
s.AddCodec(KindWebSocket, NewWebSocketNodeCodec())

s.AddKnownType(KindGateway, &GatewayNodeSpec{})
s.AddKnownValue(KindGateway, value[KindGateway])
s.AddCodec(KindGateway, NewGatewayNodeCodec())

return nil
Expand Down
Loading

0 comments on commit 4c39a70

Please sign in to comment.