From 9bff565abf84d59a46e8d8b49f486ee84ab20229 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Thu, 27 Jul 2023 10:22:17 -0400 Subject: [PATCH] Implement `TranslateMapParallel()` as generalized concurrent map iterator. Integrate `TranslateMapParallel()` into datamodel for `Paths` to replace specialized async logic. --- datamodel/high/v3/paths.go | 37 +++++----- datamodel/translate.go | 78 +++++++++++++++++++++- datamodel/translate_test.go | 130 ++++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+), 22 deletions(-) diff --git a/datamodel/high/v3/paths.go b/datamodel/high/v3/paths.go index 7927049d..c68e1530 100644 --- a/datamodel/high/v3/paths.go +++ b/datamodel/high/v3/paths.go @@ -6,8 +6,10 @@ package v3 import ( "sort" + "github.com/pb33f/libopenapi/datamodel" "github.com/pb33f/libopenapi/datamodel/high" - low "github.com/pb33f/libopenapi/datamodel/low/v3" + "github.com/pb33f/libopenapi/datamodel/low" + v3low "github.com/pb33f/libopenapi/datamodel/low/v3" "github.com/pb33f/libopenapi/utils" "gopkg.in/yaml.v3" ) @@ -21,42 +23,37 @@ import ( type Paths struct { PathItems map[string]*PathItem `json:"-" yaml:"-"` Extensions map[string]any `json:"-" yaml:"-"` - low *low.Paths + low *v3low.Paths } // NewPaths creates a new high-level instance of Paths from a low-level one. -func NewPaths(paths *low.Paths) *Paths { +func NewPaths(paths *v3low.Paths) *Paths { p := new(Paths) p.low = paths p.Extensions = high.ExtractExtensions(paths.Extensions) items := make(map[string]*PathItem) - // build paths async for speed. type pRes struct { - k string - v *PathItem + key string + value *PathItem } - var buildPathItem = func(key string, item *low.PathItem, c chan<- pRes) { - c <- pRes{key, NewPathItem(item)} - } - rChan := make(chan pRes) - for k := range paths.PathItems { - go buildPathItem(k.Value, paths.PathItems[k].Value, rChan) + + translateFunc := func(key low.KeyReference[string], value low.ValueReference[*v3low.PathItem]) (pRes, error) { + return pRes{key: key.Value, value: NewPathItem(value.Value)}, nil } - pathsBuilt := 0 - for pathsBuilt < len(paths.PathItems) { - select { - case r := <-rChan: - pathsBuilt++ - items[r.k] = r.v - } + resultFunc := func(value pRes) error { + items[value.key] = value.value + return nil } + _ = datamodel.TranslateMapParallel[low.KeyReference[string], low.ValueReference[*v3low.PathItem], pRes]( + paths.PathItems, translateFunc, resultFunc, + ) p.PathItems = items return p } // GoLow returns the low-level Paths instance used to create the high-level one. -func (p *Paths) GoLow() *low.Paths { +func (p *Paths) GoLow() *v3low.Paths { return p.low } diff --git a/datamodel/translate.go b/datamodel/translate.go index 258db13d..7b9b4d1b 100644 --- a/datamodel/translate.go +++ b/datamodel/translate.go @@ -2,6 +2,7 @@ package datamodel import ( "context" + "errors" "io" "runtime" "sync" @@ -10,13 +11,13 @@ import ( type ActionFunc[T any] func(T) error type TranslateFunc[IN any, OUT any] func(IN) (OUT, error) type TranslateSliceFunc[IN any, OUT any] func(int, IN) (OUT, error) -type ResultFunc[V any] func(V) error +type TranslateMapFunc[K any, V any, OUT any] func(K, V) (OUT, error) type continueError struct { error } -var Continue = &continueError{} +var Continue = &continueError{error: errors.New("Continue")} // TranslateSliceParallel iterates a slice in parallel and calls translate() // asynchronously. @@ -110,6 +111,79 @@ JOBLOOP: return reterr } +// TranslateMapParallel iterates a map in parallel and calls translate() +// asynchronously. +// translate() or result() may return `io.EOF` to break iteration. +// Results are provided sequentially to result(). Result order is +// nondeterministic. +func TranslateMapParallel[K comparable, V any, OUT any](m map[K]V, translate TranslateMapFunc[K, V, OUT], result ActionFunc[OUT]) error { + if len(m) == 0 { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + concurrency := runtime.NumCPU() + resultChan := make(chan OUT, concurrency) + var reterr error + var mu sync.Mutex + var wg sync.WaitGroup + + // Fan out input translation. + wg.Add(1) + go func() { + defer wg.Done() + for k, v := range m { + if ctx.Err() != nil { + return + } + wg.Add(1) + go func(k K, v V) { + defer wg.Done() + value, err := translate(k, v) + if err == Continue { + return + } + if err != nil { + mu.Lock() + if reterr == nil { + reterr = err + } + mu.Unlock() + cancel() + return + } + select { + case resultChan <- value: + case <-ctx.Done(): + } + }(k, v) + } + }() + + go func() { + // Indicate EOF after all translate goroutines finish. + wg.Wait() + close(resultChan) + }() + + // Iterate results. + for value := range resultChan { + err := result(value) + if err != nil { + cancel() + wg.Wait() + reterr = err + break + } + } + + if reterr == io.EOF { + return nil + } + return reterr +} + // TranslatePipeline processes input sequentially through predicate(), sends to // translate() in parallel, then outputs in stable order. // translate() may return `datamodel.Continue` to continue iteration. diff --git a/datamodel/translate_test.go b/datamodel/translate_test.go index 59927d99..497af39a 100644 --- a/datamodel/translate_test.go +++ b/datamodel/translate_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sort" "strconv" "sync" "sync/atomic" @@ -157,6 +158,135 @@ func TestTranslateSliceParallel(t *testing.T) { } } +func TestTranslateMapParallel(t *testing.T) { + const mapSize = 1000 + + t.Run("Happy path", func(t *testing.T) { + var expectedResults []string + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + expectedResults = append(expectedResults, fmt.Sprintf("foobar %d", i+1000)) + } + + var translateCounter int64 + translateFunc := func(_ string, value int) (string, error) { + result := fmt.Sprintf("foobar %d", value) + atomic.AddInt64(&translateCounter, 1) + return result, nil + } + var results []string + resultFunc := func(value string) error { + results = append(results, value) + return nil + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Equal(t, int64(mapSize), translateCounter) + assert.Equal(t, mapSize, len(results)) + sort.Strings(results) + assert.Equal(t, expectedResults, results) + }) + + t.Run("Error in translate", func(t *testing.T) { + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + } + + var translateCounter int64 + translateFunc := func(_ string, _ int) (string, error) { + atomic.AddInt64(&translateCounter, 1) + return "", errors.New("Foobar") + } + resultFunc := func(_ string) error { + t.Fatal("Expected no call to resultFunc()") + return nil + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.ErrorContains(t, err, "Foobar") + assert.Less(t, translateCounter, int64(mapSize)) + }) + + t.Run("Error in result", func(t *testing.T) { + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + } + + translateFunc := func(_ string, value int) (string, error) { + return "", nil + } + var resultCounter int + resultFunc := func(_ string) error { + resultCounter++ + return errors.New("Foobar") + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.ErrorContains(t, err, "Foobar") + assert.Less(t, resultCounter, mapSize) + }) + + t.Run("EOF in translate", func(t *testing.T) { + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + } + + var translateCounter int64 + translateFunc := func(_ string, _ int) (string, error) { + atomic.AddInt64(&translateCounter, 1) + return "", io.EOF + } + resultFunc := func(_ string) error { + t.Fatal("Expected no call to resultFunc()") + return nil + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Less(t, translateCounter, int64(mapSize)) + }) + + t.Run("EOF in result", func(t *testing.T) { + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + } + + translateFunc := func(_ string, value int) (string, error) { + return "", nil + } + var resultCounter int + resultFunc := func(_ string) error { + resultCounter++ + return io.EOF + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Less(t, resultCounter, mapSize) + }) + + t.Run("Continue in translate", func(t *testing.T) { + m := make(map[string]int) + for i := 0; i < mapSize; i++ { + m[fmt.Sprintf("key%d", i)] = i + 1000 + } + + var translateCounter int64 + translateFunc := func(_ string, _ int) (string, error) { + atomic.AddInt64(&translateCounter, 1) + return "", datamodel.Continue + } + resultFunc := func(_ string) error { + t.Fatal("Expected no call to resultFunc()") + return nil + } + err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Equal(t, int64(mapSize), translateCounter) + }) +} + func TestTranslatePipeline(t *testing.T) { testCases := []struct { ItemCount int