Skip to content

Commit

Permalink
refactor: remove unload in loader and migrate
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 26, 2023
1 parent 06cb2d6 commit f429093
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 532 deletions.
260 changes: 99 additions & 161 deletions pkg/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"sync"

"github.com/oklog/ulid/v2"
"github.com/siyul-park/uniflow/pkg/database/memdb"
"github.com/samber/lo"
"github.com/siyul-park/uniflow/pkg/database"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/storage"
Expand All @@ -16,219 +17,156 @@ import (
type (
// Config is a config for for the Loader.
Config struct {
Table *symbol.Table
Scheme *scheme.Scheme
Storage *storage.Storage
Namespace string
Table *symbol.Table
Scheme *scheme.Scheme
Storage *storage.Storage
}

// Loader loads scheme.Spec into symbol.Table.
Loader struct {
scheme *scheme.Scheme
table *symbol.Table
remote *storage.Storage
local *storage.Storage
mu sync.RWMutex
namespace string
scheme *scheme.Scheme
table *symbol.Table
storage *storage.Storage
mu sync.RWMutex
}
)

// New returns a new Loader.
func New(ctx context.Context, config Config) (*Loader, error) {
func New(config Config) *Loader {
namespace := config.Namespace
table := config.Table
scheme := config.Scheme
remote := config.Storage

local, err := storage.New(ctx, storage.Config{
Scheme: scheme,
Database: memdb.New(""),
})
if err != nil {
return nil, err
}
storage := config.Storage

return &Loader{
scheme: scheme,
table: table,
remote: remote,
local: local,
}, nil
namespace: namespace,
scheme: scheme,
table: table,
storage: storage,
}
}

// LoadOne loads a single scheme.Spec from the storage.Storage
func (ld *Loader) LoadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) {
func (ld *Loader) LoadOne(ctx context.Context, id ulid.ULID) (node.Node, error) {
ld.mu.Lock()
defer ld.mu.Unlock()

return ld.loadOne(ctx, filter)
}

// LoadMany loads multiple scheme.Spec from the storage.Storage
func (ld *Loader) LoadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) {
ld.mu.Lock()
defer ld.mu.Unlock()
namespace := ld.namespace

return ld.loadMany(ctx, filter)
}
queue := []any{id}
for len(queue) > 0 {
prev := queue
queue = nil

// UnloadOne unloads a single scheme.Spec from the storage.Storage
func (ld *Loader) UnloadOne(ctx context.Context, filter *storage.Filter) (bool, error) {
ld.mu.Lock()
defer ld.mu.Unlock()
exists := map[any]bool{}

return ld.unloadOne(ctx, filter)
}

// UnloadMany unloads multiple scheme.Spec from the storage.Storage
func (ld *Loader) UnloadMany(ctx context.Context, filter *storage.Filter) (int, error) {
ld.mu.Lock()
defer ld.mu.Unlock()

return ld.unloadMany(ctx, filter)
}

func (ld *Loader) loadOne(ctx context.Context, filter *storage.Filter) (node.Node, error) {
remote, err := ld.remote.FindOne(ctx, filter)
if err != nil {
return nil, err
}
local, err := ld.local.FindOne(ctx, filter)
if err != nil {
return nil, err
}

if remote != nil {
if reflect.DeepEqual(remote, local) {
if n, ok := ld.table.Lookup(remote.GetID()); ok {
return n, nil
var filter *storage.Filter
for _, key := range prev {
if k, ok := key.(ulid.ULID); ok {
exists[k] = false
filter = filter.Or(storage.Where[ulid.ULID](scheme.KeyID).EQ(k))
} else if k, ok := key.(string); ok {
exists[k] = false
filter = filter.Or(storage.Where[string](scheme.KeyName).EQ(k))
}
}
} else {
if local != nil {
_, err := ld.unloadOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID()))
return nil, err
if namespace != "" {
filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(namespace))
}
return nil, nil
}

// load child

if n, err := ld.scheme.Decode(remote); err != nil {
return nil, err
} else {
err := ld.table.Insert(n, remote)
specs, err := ld.storage.FindMany(ctx, filter, &database.FindOptions{Limit: lo.ToPtr(len(prev))})
if err != nil {
return nil, err
}

if local == nil {
if _, err := ld.local.InsertOne(ctx, remote); err != nil {
return nil, err
for _, spec := range specs {
exists[spec.GetID()] = true
if spec.GetName() != "" {
exists[spec.GetName()] = true
}
} else {
if _, err := ld.local.UpdateOne(ctx, remote); err != nil {
return nil, err
}
}

return n, nil
}
}
if namespace == "" {
namespace = spec.GetNamespace()
}

func (ld *Loader) loadMany(ctx context.Context, filter *storage.Filter) ([]node.Node, error) {
remotes, err := ld.remote.FindMany(ctx, filter)
if err != nil {
return nil, err
}
locals, err := ld.local.FindMany(ctx, filter)
if err != nil {
return nil, err
}
if sym, ok := ld.table.LookupByID(spec.GetID()); ok {
if reflect.DeepEqual(sym.Spec, spec) {
continue
}
}

idToLocal := map[ulid.ULID]scheme.Spec{}
idToRemote := map[ulid.ULID]scheme.Spec{}
for _, spec := range locals {
idToLocal[spec.GetID()] = spec
}
for _, spec := range remotes {
idToRemote[spec.GetID()] = spec
}
if n, err := ld.scheme.Decode(spec); err != nil {
return nil, err
} else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil {
return nil, err
}

var removeIds []ulid.ULID
for id := range idToLocal {
if _, ok := idToRemote[id]; !ok {
removeIds = append(removeIds, id)
}
}
if len(removeIds) > 0 {
if _, err := ld.unloadMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(removeIds...)); err != nil {
return nil, err
for _, locations := range spec.GetLinks() {
for _, location := range locations {
if location.ID != (ulid.ULID{}) {
queue = append(queue, location.ID)
} else if location.Name != "" {
queue = append(queue, location.Name)
}
}
}
}
}

var nodes []node.Node
for id, remote := range idToRemote {
local := idToLocal[id]
if reflect.DeepEqual(remote, local) {
if n, ok := ld.table.Lookup(id); ok {
nodes = append(nodes, n)
for key, exist := range exists {
if exist {
continue
}
}

// load child

if n, err := ld.scheme.Decode(remote); err != nil {
return nil, err
} else {
if err := ld.table.Insert(n, remote); err != nil {
return nil, err
} else {
nodes = append(nodes, n)
}
if local == nil {
if _, err := ld.local.InsertOne(ctx, remote); err != nil {
return nil, err
id, ok := key.(ulid.ULID)
if !ok {
if name, ok := key.(string); ok {
if sym, ok := ld.table.LookupByName(namespace, name); ok {
id = sym.ID()
}
}
} else {
if _, err := ld.local.UpdateOne(ctx, remote); err != nil {
}

if id != (ulid.ULID{}) {
if _, err := ld.table.Free(id); err != nil {
return nil, err
}
}
}
}

return nodes, nil
}

func (ld *Loader) unloadOne(ctx context.Context, filter *storage.Filter) (bool, error) {
local, err := ld.local.FindOne(ctx, filter)
if err != nil {
return false, err
}
if local == nil {
return false, nil
}

if _, err := ld.table.Free(local.GetID()); err != nil {
return false, err
if sym, ok := ld.table.LookupByID(id); !ok {
return nil, nil
} else {
return sym.Node, nil
}
return ld.local.DeleteOne(ctx, storage.Where[ulid.ULID](scheme.KeyID).EQ(local.GetID()))
}

func (ld *Loader) unloadMany(ctx context.Context, filter *storage.Filter) (int, error) {
locals, err := ld.local.FindMany(ctx, filter)
// LoadAll loads all scheme.Spec from the storage.Storage
func (ld *Loader) LoadAll(ctx context.Context) ([]node.Node, error) {
specs, err := ld.storage.FindMany(ctx, nil)
if err != nil {
return 0, err
return nil, err
}

for _, local := range locals {
if _, err := ld.table.Free(local.GetID()); err != nil {
return 0, err
var nodes []node.Node
for _, spec := range specs {
if sym, ok := ld.table.LookupByID(spec.GetID()); ok {
if reflect.DeepEqual(sym.Spec, spec) {
nodes = append(nodes, sym.Node)
continue
}
}
}

var ids []ulid.ULID
for _, local := range locals {
ids = append(ids, local.GetID())
if n, err := ld.scheme.Decode(spec); err != nil {
return nil, err
} else if err := ld.table.Insert(&symbol.Symbol{Node: n, Spec: spec}); err != nil {
return nil, err
} else {
nodes = append(nodes, n)
}
}
return ld.local.DeleteMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...))

return nodes, nil
}
Loading

0 comments on commit f429093

Please sign in to comment.