Skip to content

Commit

Permalink
fix: unload resources using topological sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 3, 2024
1 parent 4cedf4f commit 9c5c0a3
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 56 deletions.
1 change: 1 addition & 0 deletions cmd/pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func runStartCommand(config StartConfig) func(cmd *cobra.Command, args []string)
SecretStore: config.SecretStore,
ChartStore: config.ChartStore,
})
defer r.Close()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand Down
49 changes: 45 additions & 4 deletions pkg/chart/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,49 @@ func (t *Table) Close() error {
t.mu.Lock()
defer t.mu.Unlock()

for id := range t.charts {
if _, err := t.free(id); err != nil {
degree := map[*Chart]int{}
for id, chrt := range t.charts {
degree[chrt] = 0
for _, ports := range t.references[id] {
degree[chrt] += len(ports)
}
}

var queue []*Chart
for chrt, count := range degree {
if count == 0 {
queue = append(queue, chrt)
}
}

charts := make([]*Chart, 0, len(t.charts))
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]

if slices.Contains(charts, curr) {
continue
}
charts = append(charts, curr)

for _, spec := range curr.GetSpecs() {
id := t.lookup(curr.GetNamespace(), spec.GetKind())
if next, ok := t.charts[id]; ok {
degree[next]--
if degree[next] == 0 {
queue = append(queue, next)
}
}
}
}
for chrt, count := range degree {
if count != 0 {
charts = append(charts, chrt)
}
}

for _, chrt := range charts {
if _, err := t.free(chrt.GetID()); err != nil {
return err
}
}
Expand Down Expand Up @@ -252,8 +293,8 @@ func (t *Table) linked(chrt *Chart) []*Chart {
}
}
}
for curr, d := range degree {
if d != 0 {
for curr, count := range degree {
if count != 0 {
linked = append(linked, curr)
}
}
Expand Down
26 changes: 0 additions & 26 deletions pkg/port/outport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/types"
)

// OutPort represents an output port for sending data.
Expand All @@ -18,31 +17,6 @@ type OutPort struct {
mu sync.RWMutex
}

// Send sends the payload through the OutPort and returns the result or an error.
func Send(out *OutPort, payload types.Value) (types.Value, error) {
var err error

proc := process.New()
defer proc.Exit(err)

writer := out.Open(proc)
defer writer.Close()

outPck := packet.New(payload)
backPck := packet.Send(writer, outPck)

payload = backPck.Payload()

if v, ok := payload.(types.Error); ok {
err = v.Unwrap()
}

if err != nil {
return nil, err
}
return payload, nil
}

// NewOut creates and returns a new OutPort instance.
func NewOut() *OutPort {
return &OutPort{
Expand Down
9 changes: 0 additions & 9 deletions pkg/port/outport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestWrite(t *testing.T) {
out := NewOut()
defer out.Close()

res, err := Send(out, nil)
assert.NoError(t, err)
assert.Nil(t, res)
}

func TestOutPort_Open(t *testing.T) {
proc := process.New()
defer proc.Exit(nil)
Expand Down
96 changes: 79 additions & 17 deletions pkg/symbol/table.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package symbol

import (
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/process"
"slices"
"sync"

Expand Down Expand Up @@ -93,8 +95,56 @@ func (t *Table) Close() error {
t.mu.Lock()
defer t.mu.Unlock()

for id := range t.symbols {
if _, err := t.free(id); err != nil {
degree := map[*Symbol]int{}
for id, sb := range t.symbols {
degree[sb] = 0
for _, ports := range t.references[id] {
degree[sb] += len(ports)
}
}

var queue []*Symbol
for sb, count := range degree {
if count == 0 {
queue = append(queue, sb)
}
}

symbols := make([]*Symbol, 0, len(t.symbols))
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]

if slices.Contains(symbols, curr) {
continue
}
symbols = append(symbols, curr)

for _, ports := range curr.Ports() {
for _, port := range ports {
id := port.ID
if id == uuid.Nil {
id = t.lookup(curr.Namespace(), port.Name)
}

next, ok := t.symbols[id]
if ok && next.Namespace() == curr.Namespace() {
degree[next]--
if degree[next] == 0 {
queue = append(queue, next)
}
}
}
}
}
for sb, count := range degree {
if count != 0 {
symbols = append(symbols, sb)
}
}

for _, sb := range symbols {
if _, err := t.free(sb.ID()); err != nil {
return err
}
}
Expand Down Expand Up @@ -194,13 +244,13 @@ func (t *Table) links(sb *Symbol) {
out.Link(in)
}

refences := t.references[ref.ID()]
if refences == nil {
refences = make(map[string][]spec.Port)
t.references[ref.ID()] = refences
references := t.references[ref.ID()]
if references == nil {
references = make(map[string][]spec.Port)
t.references[ref.ID()] = references
}

refences[port.Port] = append(refences[port.Port], spec.Port{
references[port.Port] = append(references[port.Port], spec.Port{
ID: sb.ID(),
Name: port.Name,
Port: name,
Expand All @@ -225,13 +275,13 @@ func (t *Table) links(sb *Symbol) {
out.Link(in)
}

refences := t.references[sb.ID()]
if refences == nil {
refences = make(map[string][]spec.Port)
t.references[sb.ID()] = refences
references := t.references[sb.ID()]
if references == nil {
references = make(map[string][]spec.Port)
t.references[sb.ID()] = references
}

refences[port.Port] = append(refences[port.Port], spec.Port{
references[port.Port] = append(references[port.Port], spec.Port{
ID: ref.ID(),
Name: port.Name,
Port: name,
Expand Down Expand Up @@ -334,9 +384,9 @@ func (t *Table) linked(sb *Symbol) []*Symbol {
}
}
}
for curr, d := range degree {
if d != 0 {
linked = append(linked, curr)
for sb, count := range degree {
if count != 0 {
linked = append(linked, sb)
}
}

Expand Down Expand Up @@ -384,7 +434,8 @@ func (t *Table) call(sb *Symbol, name string) error {
id = t.lookup(sb.Namespace(), port.Name)
}

if ref, ok := t.symbols[id]; ok && ref.Namespace() == sb.Namespace() {
ref, ok := t.symbols[id]
if ok && ref.Namespace() == sb.Namespace() {
if in := ref.In(port.Port); in != nil {
out.Link(in)
}
Expand All @@ -396,7 +447,18 @@ func (t *Table) call(sb *Symbol, name string) error {
return err
}

_, err = port.Send(out, payload)
proc := process.New()
defer proc.Exit(err)

writer := out.Open(proc)
defer writer.Close()

outPck := packet.New(payload)
backPck := packet.Send(writer, outPck)

if v, ok := backPck.Payload().(types.Error); ok {
err = v.Unwrap()
}
return err
}

Expand Down

0 comments on commit 9c5c0a3

Please sign in to comment.