Skip to content

Commit

Permalink
fix: failure tolerance when first load
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 17, 2024
1 parent ef8a617 commit dec83fc
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 77 deletions.
8 changes: 2 additions & 6 deletions cmd/pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ func runStartCommand(config StartConfig) func(cmd *cobra.Command, args []string)
if err := r.Watch(ctx); err != nil {
return err
}
if err := r.Load(ctx); err != nil {
return err
}
r.Load(ctx)
go r.Reconcile(ctx)
return d.Run()
}
Expand All @@ -150,9 +148,7 @@ func runStartCommand(config StartConfig) func(cmd *cobra.Command, args []string)
if err := r.Watch(ctx); err != nil {
return err
}
if err := r.Load(ctx); err != nil {
return err
}
r.Load(ctx)
return r.Reconcile(ctx)
}
}
4 changes: 0 additions & 4 deletions pkg/chart/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ func (l *Loader) Load(ctx context.Context, charts ...*Chart) error {
}
}

if len(errs) > 0 {
loaded = nil
}

for _, id := range l.table.Keys() {
chrt := l.table.Lookup(id)
if chrt != nil && len(resource.Match(chrt, examples...)) > 0 {
Expand Down
56 changes: 13 additions & 43 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runtime

import (
"context"
"errors"
"sync"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -114,10 +115,10 @@ func New(config Config) *Runtime {

// Load loads symbols from the spec store into the symbol table.
func (r *Runtime) Load(ctx context.Context) error {
if err := r.chartLoader.Load(ctx, &chart.Chart{Namespace: r.namespace}); err != nil {
return err
}
return r.symbolLoader.Load(ctx, &spec.Meta{Namespace: r.namespace})
var errs []error
errs = append(errs, r.chartLoader.Load(ctx, &chart.Chart{Namespace: r.namespace}))
errs = append(errs, r.symbolLoader.Load(ctx, &spec.Meta{Namespace: r.namespace}))
return errors.Join(errs...)
}

// Watch sets up watchers for specification and secret changes.
Expand Down Expand Up @@ -175,8 +176,6 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
return nil
}

unloaded := make(map[uuid.UUID]spec.Spec)

for {
select {
case <-ctx.Done():
Expand All @@ -196,13 +195,7 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
}
}

for _, sp := range specs {
if err := r.symbolLoader.Load(ctx, sp); err != nil {
unloaded[sp.GetID()] = sp
} else {
delete(unloaded, sp.GetID())
}
}
r.symbolLoader.Load(ctx, specs...)
case event, ok := <-secretStream.Next():
if !ok {
return nil
Expand All @@ -216,26 +209,15 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
secrets = append(secrets, &secret.Secret{ID: event.ID})
}

bounded := make(map[uuid.UUID]spec.Spec)
var specs []spec.Spec
for _, id := range r.symbolTable.Keys() {
sb := r.symbolTable.Lookup(id)
if sb != nil && spec.IsBound(sb.Spec, secrets...) {
bounded[sb.ID()] = sb.Spec
}
}
for _, sp := range unloaded {
if spec.IsBound(sp, secrets...) {
bounded[sp.GetID()] = sp
specs = append(specs, sb.Spec)
}
}

for _, sp := range bounded {
if err := r.symbolLoader.Load(ctx, sp); err != nil {
unloaded[sp.GetID()] = sp
} else {
delete(unloaded, sp.GetID())
}
}
r.symbolLoader.Load(ctx, specs...)
case event, ok := <-chartStream.Next():
if !ok {
return nil
Expand All @@ -255,32 +237,20 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
kinds = append(kinds, chrt.GetName())
}

bounded := make(map[uuid.UUID]spec.Spec)
var specs []spec.Spec
for _, id := range r.symbolTable.Keys() {
sb := r.symbolTable.Lookup(id)
if sb != nil && slices.Contains(kinds, sb.Kind()) {
bounded[sb.ID()] = sb.Spec
}
}
for _, sp := range unloaded {
if slices.Contains(kinds, sp.GetKind()) {
bounded[sp.GetID()] = sp
specs = append(specs, sb.Spec)
}
}

for _, sp := range bounded {
for _, sp := range specs {
r.symbolTable.Free(sp.GetID())
}

r.chartLoader.Load(ctx, &chart.Chart{ID: event.ID})

for _, sp := range bounded {
if err := r.symbolLoader.Load(ctx, sp); err != nil {
unloaded[sp.GetID()] = sp
} else {
delete(unloaded, sp.GetID())
}
}
r.symbolLoader.Load(ctx, specs...)
}
}
}
Expand Down
29 changes: 8 additions & 21 deletions pkg/symbol/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,43 +80,30 @@ func (l *Loader) Load(ctx context.Context, specs ...spec.Spec) error {
var symbols []*Symbol
var errs []error
for _, sp := range specs {
bind, err := spec.Bind(sp, secrets...)
if err != nil {
if bind, err := spec.Bind(sp, secrets...); err != nil {
errs = append(errs, err)
continue
}

decode, err := l.scheme.Decode(bind)
if err != nil {
} else if decode, err := l.scheme.Decode(bind); err != nil {
errs = append(errs, err)
continue
} else if decode != nil {
sp = decode
}

sb := l.table.Lookup(decode.GetID())
if sb == nil || !reflect.DeepEqual(sb.Spec, decode) {
n, err := l.scheme.Compile(decode)
sb := l.table.Lookup(sp.GetID())
if sb == nil || !reflect.DeepEqual(sb.Spec, sp) {
n, err := l.scheme.Compile(sp)
if err != nil {
errs = append(errs, err)
continue
}

sb = &Symbol{Spec: decode, Node: n}
sb = &Symbol{Spec: sp, Node: n}
if err := l.table.Insert(sb); err != nil {
errs = append(errs, err)
continue
}
}

symbols = append(symbols, sb)
}

if len(errs) > 0 {
for _, sb := range symbols {
sb.Close()
}
symbols = nil
}

for _, id := range l.table.Keys() {
sb := l.table.Lookup(id)
if sb != nil && len(resource.Match(sb.Spec, examples...)) > 0 {
Expand Down
1 change: 0 additions & 1 deletion pkg/symbol/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,5 @@ func TestLoader_Load(t *testing.T) {

err := loader.Load(ctx, meta)
assert.Error(t, err)
assert.Nil(t, table.Lookup(meta.GetID()))
})
}
7 changes: 5 additions & 2 deletions pkg/symbol/symbol.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *Symbol) In(name string) *port.InPort {
}

p, ok := s.ins[name]
if !ok {
if !ok && s.Node != nil {
if p = s.Node.In(name); p != nil {
s.ins[name] = p
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *Symbol) Out(name string) *port.OutPort {
}

p, ok := s.outs[name]
if !ok {
if !ok && s.Node != nil {
if p = s.Node.Out(name); p != nil {
s.outs[name] = p
}
Expand All @@ -123,5 +123,8 @@ func (s *Symbol) Close() error {
s.ins = nil
s.outs = nil

if s.Node == nil {
return nil
}
return s.Node.Close()
}
4 changes: 4 additions & 0 deletions pkg/symbol/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ func (t *Table) active(sb *Symbol) bool {
}
visited[curr] = struct{}{}

if curr.Node == nil {
return false
}

for _, ports := range curr.Ports() {
for _, port := range ports {
id := port.ID
Expand Down

0 comments on commit dec83fc

Please sign in to comment.