From f4290934242a441ea0dfc6ef8e2eef0a3b82c6a2 Mon Sep 17 00:00:00 2001 From: siyul-park Date: Sat, 25 Nov 2023 22:42:20 -0500 Subject: [PATCH] refactor: remove unload in loader and migrate --- pkg/loader/loader.go | 260 ++++++++++-------------- pkg/loader/loader_test.go | 360 +++------------------------------- pkg/loader/reconciler.go | 4 +- pkg/loader/reconciler_test.go | 4 +- pkg/runtime/runtime.go | 30 +-- pkg/symbol/symbol.go | 3 +- pkg/symbol/table.go | 30 ++- pkg/symbol/table_test.go | 4 +- 8 files changed, 163 insertions(+), 532 deletions(-) diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 17b08364..f51eb04c 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -6,7 +6,8 @@ import ( "sync" "github.com/oklog/ulid/v2" - "github.com/siyul-park/uniflow/pkg/database/memdb" + "github.com/samber/lo" + "github.com/siyul-park/uniflow/pkg/database" "github.com/siyul-park/uniflow/pkg/node" "github.com/siyul-park/uniflow/pkg/scheme" "github.com/siyul-park/uniflow/pkg/storage" @@ -16,219 +17,156 @@ import ( type ( // Config is a config for for the Loader. Config struct { - Table *symbol.Table - Scheme *scheme.Scheme - Storage *storage.Storage + Namespace string + Table *symbol.Table + Scheme *scheme.Scheme + Storage *storage.Storage } // Loader loads scheme.Spec into symbol.Table. Loader struct { - scheme *scheme.Scheme - table *symbol.Table - remote *storage.Storage - local *storage.Storage - mu sync.RWMutex + namespace string + scheme *scheme.Scheme + table *symbol.Table + storage *storage.Storage + mu sync.RWMutex } ) // New returns a new Loader. -func New(ctx context.Context, config Config) (*Loader, error) { +func New(config Config) *Loader { + namespace := config.Namespace table := config.Table scheme := config.Scheme - remote := config.Storage - - local, err := storage.New(ctx, storage.Config{ - Scheme: scheme, - Database: memdb.New(""), - }) - if err != nil { - return nil, err - } + storage := config.Storage return &Loader{ - scheme: scheme, - table: table, - remote: remote, - local: local, - }, nil + namespace: namespace, + scheme: scheme, + table: table, + storage: storage, + } } // LoadOne loads a single scheme.Spec from the storage.Storage -func (ld *Loader) LoadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) { +func (ld *Loader) LoadOne(ctx context.Context, id ulid.ULID) (node.Node, error) { ld.mu.Lock() defer ld.mu.Unlock() - return ld.loadOne(ctx, filter) -} - -// LoadMany loads multiple scheme.Spec from the storage.Storage -func (ld *Loader) LoadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) { - ld.mu.Lock() - defer ld.mu.Unlock() + namespace := ld.namespace - return ld.loadMany(ctx, filter) -} + queue := []any{id} + for len(queue) > 0 { + prev := queue + queue = nil -// UnloadOne unloads a single scheme.Spec from the storage.Storage -func (ld *Loader) UnloadOne(ctx context.Context, filter *storage.Filter) (bool, error) { - ld.mu.Lock() - defer ld.mu.Unlock() + exists := map[any]bool{} - return ld.unloadOne(ctx, filter) -} - -// UnloadMany unloads multiple scheme.Spec from the storage.Storage -func (ld *Loader) UnloadMany(ctx context.Context, filter *storage.Filter) (int, error) { - ld.mu.Lock() - defer ld.mu.Unlock() - - return ld.unloadMany(ctx, filter) -} - -func (ld *Loader) loadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) { - remote, err := ld.remote.FindOne(ctx, filter) - if err != nil { - return nil, err - } - local, err := ld.local.FindOne(ctx, filter) - if err != nil { - return nil, err - } - - if remote != nil { - if reflect.DeepEqual(remote, local) { - if n, ok := ld.table.Lookup(remote.GetID()); ok { - return n, nil + var filter *storage.Filter + for _, key := range prev { + if k, ok := key.(ulid.ULID); ok { + exists[k] = false + filter = filter.Or(storage.Where[ulid.ULID](scheme.KeyID).EQ(k)) + } else if k, ok := key.(string); ok { + exists[k] = false + filter = filter.Or(storage.Where[string](scheme.KeyName).EQ(k)) } } - } else { - if local != nil { - _, err := ld.unloadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID())) - return nil, err + if namespace != "" { + filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(namespace)) } - return nil, nil - } - - // load child - if n, err := ld.scheme.Decode(remote); err != nil { - return nil, err - } else { - err := ld.table.Insert(n, remote) + specs, err := ld.storage.FindMany(ctx, filter, &database.FindOptions{Limit: lo.ToPtr(len(prev))}) if err != nil { return nil, err } - if local == nil { - if _, err := ld.local.InsertOne(ctx, remote); err != nil { - return nil, err + for _, spec := range specs { + exists[spec.GetID()] = true + if spec.GetName() != "" { + exists[spec.GetName()] = true } - } else { - if _, err := ld.local.UpdateOne(ctx, remote); err != nil { - return nil, err - } - } - return n, nil - } -} + if namespace == "" { + namespace = spec.GetNamespace() + } -func (ld *Loader) loadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) { - remotes, err := ld.remote.FindMany(ctx, filter) - if err != nil { - return nil, err - } - locals, err := ld.local.FindMany(ctx, filter) - if err != nil { - return nil, err - } + if sym, ok := ld.table.LookupByID(spec.GetID()); ok { + if reflect.DeepEqual(sym.Spec, spec) { + continue + } + } - idToLocal := map[ulid.ULID]scheme.Spec{} - idToRemote := map[ulid.ULID]scheme.Spec{} - for _, spec := range locals { - idToLocal[spec.GetID()] = spec - } - for _, spec := range remotes { - idToRemote[spec.GetID()] = spec - } + if n, err := ld.scheme.Decode(spec); err != nil { + return nil, err + } else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil { + return nil, err + } - var removeIds []ulid.ULID - for id := range idToLocal { - if _, ok := idToRemote[id]; !ok { - removeIds = append(removeIds, id) - } - } - if len(removeIds) > 0 { - if _, err := ld.unloadMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(removeIds...)); err != nil { - return nil, err + for _, locations := range spec.GetLinks() { + for _, location := range locations { + if location.ID != (ulid.ULID{}) { + queue = append(queue, location.ID) + } else if location.Name != "" { + queue = append(queue, location.Name) + } + } + } } - } - var nodes []node.Node - for id, remote := range idToRemote { - local := idToLocal[id] - if reflect.DeepEqual(remote, local) { - if n, ok := ld.table.Lookup(id); ok { - nodes = append(nodes, n) + for key, exist := range exists { + if exist { continue } - } - - // load child - if n, err := ld.scheme.Decode(remote); err != nil { - return nil, err - } else { - if err := ld.table.Insert(n, remote); err != nil { - return nil, err - } else { - nodes = append(nodes, n) - } - if local == nil { - if _, err := ld.local.InsertOne(ctx, remote); err != nil { - return nil, err + id, ok := key.(ulid.ULID) + if !ok { + if name, ok := key.(string); ok { + if sym, ok := ld.table.LookupByName(namespace, name); ok { + id = sym.ID() + } } - } else { - if _, err := ld.local.UpdateOne(ctx, remote); err != nil { + } + + if id != (ulid.ULID{}) { + if _, err := ld.table.Free(id); err != nil { return nil, err } } } } - return nodes, nil -} - -func (ld *Loader) unloadOne(ctx context.Context, filter *storage.Filter) (bool, error) { - local, err := ld.local.FindOne(ctx, filter) - if err != nil { - return false, err - } - if local == nil { - return false, nil - } - - if _, err := ld.table.Free(local.GetID()); err != nil { - return false, err + if sym, ok := ld.table.LookupByID(id); !ok { + return nil, nil + } else { + return sym.Node, nil } - return ld.local.DeleteOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID())) } -func (ld *Loader) unloadMany(ctx context.Context, filter *storage.Filter) (int, error) { - locals, err := ld.local.FindMany(ctx, filter) +// LoadAll loads all scheme.Spec from the storage.Storage +func (ld *Loader) LoadAll(ctx context.Context) ([]node.Node, error) { + specs, err := ld.storage.FindMany(ctx, nil) if err != nil { - return 0, err + return nil, err } - for _, local := range locals { - if _, err := ld.table.Free(local.GetID()); err != nil { - return 0, err + var nodes []node.Node + for _, spec := range specs { + if sym, ok := ld.table.LookupByID(spec.GetID()); ok { + if reflect.DeepEqual(sym.Spec, spec) { + nodes = append(nodes, sym.Node) + continue + } } - } - var ids []ulid.ULID - for _, local := range locals { - ids = append(ids, local.GetID()) + if n, err := ld.scheme.Decode(spec); err != nil { + return nil, err + } else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil { + return nil, err + } else { + nodes = append(nodes, n) + } } - return ld.local.DeleteMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...)) + + return nodes, nil } diff --git a/pkg/loader/loader_test.go b/pkg/loader/loader_test.go index c9192b49..22c6dabd 100644 --- a/pkg/loader/loader_test.go +++ b/pkg/loader/loader_test.go @@ -15,285 +15,7 @@ import ( ) func TestLoader_LoadOne(t *testing.T) { - t.Run("linked all", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("linked all with name", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Name: faker.Word(), - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Name: faker.Word(), - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - Name: spec1.Name, - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("unlinked any", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - st.InsertOne(context.Background(), spec1) - - r1, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r1) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.Equal(t, p2.Links(), 1) - }) - - t.Run("relink any", func(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec1 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - } - spec2 := &scheme.SpecMeta{ - ID: ulid.Make(), - Kind: kind, - Namespace: scheme.NamespaceDefault, - Links: map[string][]scheme.PortLocation{ - node.PortIO: { - { - ID: spec1.GetID(), - Port: node.PortIO, - }, - }, - }, - } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec1) - st.InsertOne(context.Background(), spec2) - - r2, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec2.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r2) - - ok, err := ld.UnloadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.True(t, ok) - - r1, err := ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec1.GetID())) - assert.NoError(t, err) - assert.NotNil(t, r1) - - n1, ok := tb.Lookup(spec1.GetID()) - assert.True(t, ok) - - n2, ok := tb.Lookup(spec2.GetID()) - assert.True(t, ok) - - p1, _ := n1.Port(node.PortIO) - p2, _ := n2.Port(node.PortIO) - - assert.Equal(t, p1.Links(), 1) - assert.GreaterOrEqual(t, p2.Links(), 1) - }) -} - -func TestLoader_LoadMany(t *testing.T) { s := scheme.New() - st, _ := storage.New(context.Background(), storage.Config{ Scheme: s, Database: memdb.New(faker.Word()), @@ -302,7 +24,7 @@ func TestLoader_LoadMany(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, @@ -339,20 +61,19 @@ func TestLoader_LoadMany(t *testing.T) { st.InsertOne(context.Background(), spec1) st.InsertOne(context.Background(), spec2) - r, err := ld.LoadMany(context.Background(), nil) + r2, err := ld.LoadOne(context.Background(), spec2.GetID()) assert.NoError(t, err) - assert.Len(t, r, 2) + assert.NotNil(t, r2) - _, ok := tb.Lookup(spec1.GetID()) + _, ok := tb.LookupByID(spec1.GetID()) assert.True(t, ok) - _, ok = tb.Lookup(spec2.GetID()) + _, ok = tb.LookupByID(spec2.GetID()) assert.True(t, ok) } -func TestLoader_UnloadOne(t *testing.T) { +func TestLoader_LoadAll(t *testing.T) { s := scheme.New() - st, _ := storage.New(context.Background(), storage.Config{ Scheme: s, Database: memdb.New(faker.Word()), @@ -361,7 +82,7 @@ func TestLoader_UnloadOne(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, @@ -369,54 +90,23 @@ func TestLoader_UnloadOne(t *testing.T) { kind := faker.Word() - spec := &scheme.SpecMeta{ + spec1 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, Namespace: scheme.NamespaceDefault, } - - codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { - return node.NewOneToOneNode(node.OneToOneNodeConfig{ID: spec.GetID()}), nil - }) - - s.AddKnownType(kind, &scheme.SpecMeta{}) - s.AddCodec(kind, codec) - - st.InsertOne(context.Background(), spec) - - _, _ = ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) - - ok, err := ld.UnloadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) - assert.NoError(t, err) - assert.True(t, ok) - - _, ok = tb.Lookup(spec.GetID()) - assert.False(t, ok) -} - -func TestLoader_UnloadMany(t *testing.T) { - s := scheme.New() - - st, _ := storage.New(context.Background(), storage.Config{ - Scheme: s, - Database: memdb.New(faker.Word()), - }) - - tb := symbol.NewTable() - defer func() { _ = tb.Close() }() - - ld, _ := New(context.Background(), Config{ - Scheme: s, - Storage: st, - Table: tb, - }) - - kind := faker.Word() - - spec := &scheme.SpecMeta{ + spec2 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, Namespace: scheme.NamespaceDefault, + Links: map[string][]scheme.PortLocation{ + node.PortIO: { + { + ID: spec1.GetID(), + Port: node.PortIO, + }, + }, + }, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { @@ -426,14 +116,16 @@ func TestLoader_UnloadMany(t *testing.T) { s.AddKnownType(kind, &scheme.SpecMeta{}) s.AddCodec(kind, codec) - st.InsertOne(context.Background(), spec) - - _, _ = ld.LoadOne(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + st.InsertOne(context.Background(), spec1) + st.InsertOne(context.Background(), spec2) - count, err := ld.UnloadMany(context.Background(), storage.Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) + r, err := ld.LoadAll(context.Background()) assert.NoError(t, err) - assert.Equal(t, 1, count) + assert.Len(t, r, 2) - _, ok := tb.Lookup(spec.GetID()) - assert.False(t, ok) + _, ok := tb.LookupByID(spec1.GetID()) + assert.True(t, ok) + + _, ok = tb.LookupByID(spec2.GetID()) + assert.True(t, ok) } diff --git a/pkg/loader/reconciler.go b/pkg/loader/reconciler.go index bc97a1cb..d1f928ac 100644 --- a/pkg/loader/reconciler.go +++ b/pkg/loader/reconciler.go @@ -4,8 +4,6 @@ import ( "context" "sync" - "github.com/oklog/ulid/v2" - "github.com/siyul-park/uniflow/pkg/scheme" "github.com/siyul-park/uniflow/pkg/storage" ) @@ -71,7 +69,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) error { return nil } - if _, err := r.loader.LoadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(event.NodeID)); err != nil { + if _, err := r.loader.LoadOne(ctx, event.NodeID); err != nil { return err } } diff --git a/pkg/loader/reconciler_test.go b/pkg/loader/reconciler_test.go index 14d84a5b..b79255a5 100644 --- a/pkg/loader/reconciler_test.go +++ b/pkg/loader/reconciler_test.go @@ -26,7 +26,7 @@ func TestReconciler_Reconcile(t *testing.T) { tb := symbol.NewTable() defer func() { _ = tb.Close() }() - ld, _ := New(context.Background(), Config{ + ld := New(Config{ Scheme: s, Storage: st, Table: tb, @@ -72,7 +72,7 @@ func TestReconciler_Reconcile(t *testing.T) { assert.Fail(t, "timeout") return default: - if _, ok := tb.Lookup(m.GetID()); ok { + if _, ok := tb.LookupByID(m.GetID()); ok { return } } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 4514faa4..8aa6ec48 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -65,10 +65,11 @@ func New(ctx context.Context, config Config) (*Runtime, error) { UnloadHooks: []symbol.UnloadHook{hk}, }) - ld, err := loader.New(ctx, loader.Config{ - Scheme: sc, - Storage: st, - Table: tb, + ld := loader.New(loader.Config{ + Namespace: ns, + Scheme: sc, + Storage: st, + Table: tb, }) if err != nil { return nil, err @@ -97,20 +98,16 @@ func New(ctx context.Context, config Config) (*Runtime, error) { // Lookup lookup node.Node in symbol.Table, and if it not exist load it from storage.Storage. func (r *Runtime) Lookup(ctx context.Context, id ulid.ULID) (node.Node, error) { - filter := storage.Where[ulid.ULID](scheme.KeyID).EQ(id) - if r.namespace != "" { - filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(r.namespace)) - } - if s, ok := r.table.Lookup(id); !ok { - return r.loader.LoadOne(ctx, filter) + if s, ok := r.table.LookupByID(id); !ok { + return r.loader.LoadOne(ctx, id) } else { return s, nil } } // Free unload node.Node from symbol.Table. -func (r *Runtime) Free(ctx context.Context, id ulid.ULID) (bool, error) { - return r.loader.UnloadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(id)) +func (r *Runtime) Free(_ context.Context, id ulid.ULID) (bool, error) { + return r.table.Free(id) } // Start starts the Runtime. @@ -120,11 +117,7 @@ func (r *Runtime) Start(ctx context.Context) error { if err := r.reconciler.Watch(ctx); err != nil { return err } - var filter *storage.Filter - if r.namespace != "" { - filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(r.namespace)) - } - if _, err := r.loader.LoadMany(ctx, filter); err != nil { + if _, err := r.loader.LoadAll(ctx); err != nil { return err } return r.reconciler.Reconcile(ctx) @@ -135,8 +128,5 @@ func (r *Runtime) Close(ctx context.Context) error { if err := r.reconciler.Close(); err != nil { return err } - if _, err := r.loader.UnloadMany(ctx, nil); err != nil { - return err - } return r.table.Close() } diff --git a/pkg/symbol/symbol.go b/pkg/symbol/symbol.go index 69ab3474..4e418929 100644 --- a/pkg/symbol/symbol.go +++ b/pkg/symbol/symbol.go @@ -17,7 +17,7 @@ type ( var _ node.Node = (*Symbol)(nil) func (s *Symbol) ID() ulid.ULID { - return s.Spec.GetID() + return s.Node.ID() } func (s *Symbol) Kind() string { @@ -38,7 +38,6 @@ func (s *Symbol) Links() map[string][]scheme.PortLocation { func (s *Symbol) Port(name string) (*port.Port, bool) { return s.Node.Port(name) - } func (s *Symbol) Close() error { diff --git a/pkg/symbol/table.go b/pkg/symbol/table.go index f7a13fec..a9514c59 100644 --- a/pkg/symbol/table.go +++ b/pkg/symbol/table.go @@ -118,7 +118,7 @@ func (t *Table) Insert(sym *Symbol) error { } for name, locations := range additions { - p1, ok := prev.Port(name) + p1, ok := sym.Port(name) if !ok { unlinks[name] = locations continue @@ -144,7 +144,7 @@ func (t *Table) Insert(sym *Symbol) error { continue } - if p2, ok := ref.Node.Port(location.Port); ok { + if p2, ok := ref.Port(location.Port); ok { p1.Link(p2) linked := t.linked[ref.ID()] @@ -174,13 +174,13 @@ func (t *Table) Insert(sym *Symbol) error { } for name, locations := range t.linked[sym.ID()] { - p1, ok := prev.Port(name) + p1, ok := sym.Port(name) if !ok { continue } for _, location := range locations { ref := t.symbols[location.ID] - if p2, ok := ref.Node.Port(location.Port); ok { + if p2, ok := ref.Port(location.Port); ok { p1.Link(p2) } } @@ -194,14 +194,14 @@ func (t *Table) Insert(sym *Symbol) error { } for name, locations := range unlinks { - p1, ok := ref.Node.Port(name) + p1, ok := ref.Port(name) if !ok { continue } for i, location := range locations { if (location.ID == sym.ID()) || (location.Name != "" && location.Name == sym.Name()) { - if p2, ok := prev.Port(location.Port); ok { + if p2, ok := sym.Port(location.Port); ok { p1.Link(p2) linked := t.linked[sym.ID()] @@ -280,8 +280,8 @@ func (t *Table) Free(id ulid.ULID) (bool, error) { return false, nil } -// Lookup returns a Symbol. -func (t *Table) Lookup(id ulid.ULID) (*Symbol, bool) { +// LookupByID returns a Symbol. +func (t *Table) LookupByID(id ulid.ULID) (*Symbol, bool) { t.mu.RLock() defer t.mu.RUnlock() @@ -289,6 +289,20 @@ func (t *Table) Lookup(id ulid.ULID) (*Symbol, bool) { return sym, ok } +// LookupByID returns a Symbol. +func (t *Table) LookupByName(namespace, name string) (*Symbol, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + + if namespace, ok := t.index[namespace]; ok { + if id, ok := namespace[name]; ok { + sym, ok := t.symbols[id] + return sym, ok + } + } + return nil, false +} + // Close closes the SymbolTable. func (t *Table) Close() error { t.mu.Lock() diff --git a/pkg/symbol/table_test.go b/pkg/symbol/table_test.go index 9eb79821..5da48a8d 100644 --- a/pkg/symbol/table_test.go +++ b/pkg/symbol/table_test.go @@ -250,7 +250,7 @@ func TestTable_Free(t *testing.T) { assert.Equal(t, 0, p3.Links()) } -func TestTable_Lookup(t *testing.T) { +func TestTable_LookupByID(t *testing.T) { tb := NewTable() defer tb.Close() @@ -263,7 +263,7 @@ func TestTable_Lookup(t *testing.T) { _ = tb.Insert(sym) - r, ok := tb.Lookup(n.ID()) + r, ok := tb.LookupByID(n.ID()) assert.True(t, ok) assert.Equal(t, sym, r) }