From 42d182a24d3dc02f8cab3e8936bfe8276a367412 Mon Sep 17 00:00:00 2001 From: plastikfan Date: Sun, 9 Jun 2024 14:22:37 +0100 Subject: [PATCH] feat(boost): add boost wait group interface (#295) --- boost/base-pool.go | 3 +- boost/boost-public-api.go | 12 +++-- boost/cancellation-monitor.go | 5 +- .../mf-all-output-consumed-by-range/main.go | 4 +- .../examples/mf-err-missing-consumer/main.go | 2 +- boost/examples/mf-err-timeout-on-send/main.go | 4 +- .../mf-input-injected-via-chan/main.go | 4 +- boost/generic-pool.go | 3 +- boost/wait-group.go | 49 +++++++++++++++++++ boost/wait-group_test.go | 45 +++++++++++++++++ boost/worker-pool-func-manifold.go | 7 ++- boost/worker-pool-func-manifold_test.go | 8 +-- boost/worker-pool-func.go | 3 +- boost/worker-pool-task.go | 3 +- 14 files changed, 123 insertions(+), 29 deletions(-) create mode 100644 boost/wait-group.go create mode 100644 boost/wait-group_test.go diff --git a/boost/base-pool.go b/boost/base-pool.go index b064815..086b3e2 100644 --- a/boost/base-pool.go +++ b/boost/base-pool.go @@ -1,13 +1,12 @@ package boost import ( - "sync" "sync/atomic" ) type ( basePool[I, O any] struct { - wg *sync.WaitGroup + wg WaitGroup sequence int32 inputDupCh *Duplex[I] oi *outputInfo[O] diff --git a/boost/boost-public-api.go b/boost/boost-public-api.go index 3abd373..2a18c24 100644 --- a/boost/boost-public-api.go +++ b/boost/boost-public-api.go @@ -54,10 +54,16 @@ type ( PoolResultStreamR = <-chan *PoolResult PoolResultStreamW = chan<- *PoolResult - // Next is a sequential unique id generator func type - Next func() string - + // OnCancel is the callback required by StartCancellationMonitor OnCancel func() + + // WaitGroup allows the core sync.WaitGroup to be decorated by the client + // for debugging purposes. + WaitGroup interface { + Add(delta int) + Done() + Wait() + } ) type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error) diff --git a/boost/cancellation-monitor.go b/boost/cancellation-monitor.go index c451e1c..a01c760 100644 --- a/boost/cancellation-monitor.go +++ b/boost/cancellation-monitor.go @@ -2,20 +2,19 @@ package boost import ( "context" - "sync" ) // StartCancellationMonitor func StartCancellationMonitor(ctx context.Context, cancel context.CancelFunc, - wg *sync.WaitGroup, + wg WaitGroup, cancelCh CancelStreamR, on OnCancel, ) { wg.Add(1) go func(ctx context.Context, cancel context.CancelFunc, - wg *sync.WaitGroup, + wg WaitGroup, cancelCh CancelStreamR, ) { defer wg.Done() diff --git a/boost/examples/mf-all-output-consumed-by-range/main.go b/boost/examples/mf-all-output-consumed-by-range/main.go index de29159..0f1968d 100644 --- a/boost/examples/mf-all-output-consumed-by-range/main.go +++ b/boost/examples/mf-all-output-consumed-by-range/main.go @@ -62,7 +62,7 @@ const ( func produce(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -97,7 +97,7 @@ func produce(ctx context.Context, func consume(_ context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() diff --git a/boost/examples/mf-err-missing-consumer/main.go b/boost/examples/mf-err-missing-consumer/main.go index 9d2efac..ff633ef 100644 --- a/boost/examples/mf-err-missing-consumer/main.go +++ b/boost/examples/mf-err-missing-consumer/main.go @@ -60,7 +60,7 @@ const ( func produce(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() diff --git a/boost/examples/mf-err-timeout-on-send/main.go b/boost/examples/mf-err-timeout-on-send/main.go index 65a91fd..464d872 100644 --- a/boost/examples/mf-err-timeout-on-send/main.go +++ b/boost/examples/mf-err-timeout-on-send/main.go @@ -64,7 +64,7 @@ const ( func produce(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -80,7 +80,7 @@ func produce(ctx context.Context, func consume(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() diff --git a/boost/examples/mf-input-injected-via-chan/main.go b/boost/examples/mf-input-injected-via-chan/main.go index 6b33f4a..e270bef 100644 --- a/boost/examples/mf-input-injected-via-chan/main.go +++ b/boost/examples/mf-input-injected-via-chan/main.go @@ -64,7 +64,7 @@ const ( func inject(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -82,7 +82,7 @@ func inject(ctx context.Context, func consume(_ context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() diff --git a/boost/generic-pool.go b/boost/generic-pool.go index 63305da..1c11bfe 100644 --- a/boost/generic-pool.go +++ b/boost/generic-pool.go @@ -3,7 +3,6 @@ package boost import ( "context" "errors" - "sync" "time" "github.com/snivilised/lorax/internal/ants" @@ -61,7 +60,7 @@ func (p *taskPool) Waiting() int { } func source[I any](ctx context.Context, - wg *sync.WaitGroup, o *ants.Options, + wg WaitGroup, o *ants.Options, injectable injectable[I], closable closable, ) *Duplex[I] { diff --git a/boost/wait-group.go b/boost/wait-group.go new file mode 100644 index 0000000..dbbf7d1 --- /dev/null +++ b/boost/wait-group.go @@ -0,0 +1,49 @@ +package boost + +import ( + "sync" + "sync/atomic" +) + +// Tracker +type Tracker func(count int32) + +// TrackWaitGroup returns a trackable wait group for the native sync +// wait group specified; useful for debugging purposes. +func TrackWaitGroup(wg *sync.WaitGroup, add, done Tracker) WaitGroup { + return &TrackableWaitGroup{ + wg: wg, + add: add, + done: done, + } +} + +// TrackableWaitGroup +type TrackableWaitGroup struct { + wg *sync.WaitGroup + counter int32 + add, done Tracker +} + +// Add +func (t *TrackableWaitGroup) Add(delta int) { + n := atomic.AddInt32(&t.counter, int32(delta)) + t.wg.Add(delta) + t.add(n) +} + +// Done +func (t *TrackableWaitGroup) Done() { + n := atomic.AddInt32(&t.counter, int32(-1)) + t.wg.Done() + t.done(n) +} + +// Wait +func (t *TrackableWaitGroup) Wait() { + t.wg.Wait() +} + +func (t *TrackableWaitGroup) Count() int32 { + return atomic.LoadInt32(&t.counter) +} diff --git a/boost/wait-group_test.go b/boost/wait-group_test.go new file mode 100644 index 0000000..5e53d23 --- /dev/null +++ b/boost/wait-group_test.go @@ -0,0 +1,45 @@ +package boost_test + +import ( + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok + + "github.com/snivilised/lorax/boost" +) + +var _ = Describe("boost.WaitGroup", func() { + Context("given: a sync.WaitGroup", func() { + It("should: track invocations", func() { + var wg sync.WaitGroup + tracker := boost.TrackWaitGroup(&wg, + func(count int32) { + GinkgoWriter.Printf("---> 🔴 Add (%v)\n", count) + }, + func(count int32) { + GinkgoWriter.Printf("---> 🟢 Done (%v)\n", count) + }, + ) + + for range 10 { + tracker.Add(1) + go func(tracker boost.WaitGroup) { + defer tracker.Done() + + const delay = time.Millisecond * 100 + time.Sleep(delay) + }(tracker) + } + + tracker.Wait() + + if wg, ok := tracker.(*boost.TrackableWaitGroup); ok { + Expect(wg.Count()).To(BeEquivalentTo(0)) + } else { + Fail("tracker should be *boost.TrackableWaitGroup") + } + }) + }) +}) diff --git a/boost/worker-pool-func-manifold.go b/boost/worker-pool-func-manifold.go index b5bc5bb..1513646 100644 --- a/boost/worker-pool-func-manifold.go +++ b/boost/worker-pool-func-manifold.go @@ -2,7 +2,6 @@ package boost import ( "context" - "sync" "time" "github.com/snivilised/lorax/internal/ants" @@ -28,7 +27,7 @@ type ManifoldFuncPool[I, O any] struct { // NewManifoldFuncPool creates a new manifold function based worker pool. func NewManifoldFuncPool[I, O any](ctx context.Context, mf ManifoldFunc[I, O], - wg *sync.WaitGroup, + wg WaitGroup, options ...Option, ) (*ManifoldFuncPool[I, O], error) { var ( @@ -74,7 +73,7 @@ func (p *ManifoldFuncPool[I, O]) Post(ctx context.Context, input I) error { // mutually exclusive; that is to say, if Source is called, then Post // must not be called; any such invocations will be ignored. func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context, - wg *sync.WaitGroup, + wg WaitGroup, ) SourceStreamW[I] { o := p.pool.GetOptions() @@ -107,7 +106,7 @@ func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) { p.wg.Add(1) go func(ctx context.Context, pool *ManifoldFuncPool[I, O], - wg *sync.WaitGroup, + wg WaitGroup, interval time.Duration, ) { defer wg.Done() diff --git a/boost/worker-pool-func-manifold_test.go b/boost/worker-pool-func-manifold_test.go index 404f3f7..5b6b98d 100644 --- a/boost/worker-pool-func-manifold_test.go +++ b/boost/worker-pool-func-manifold_test.go @@ -12,7 +12,7 @@ import ( func produce(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -25,7 +25,7 @@ func produce(ctx context.Context, func inject(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -39,7 +39,7 @@ func inject(ctx context.Context, func consume(_ context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() @@ -165,7 +165,7 @@ var _ = Describe("WorkerPoolFuncManifold", func() { wg.Add(1) go func(ctx context.Context, pool *boost.ManifoldFuncPool[int, int], - wg *sync.WaitGroup, + wg boost.WaitGroup, ) { defer wg.Done() diff --git a/boost/worker-pool-func.go b/boost/worker-pool-func.go index ebdbbcd..601a44c 100644 --- a/boost/worker-pool-func.go +++ b/boost/worker-pool-func.go @@ -2,7 +2,6 @@ package boost import ( "context" - "sync" "github.com/snivilised/lorax/internal/ants" ) @@ -17,7 +16,7 @@ type FuncPool[I, O any] struct { // new jobs are submitted with Submit(task TaskFunc) func NewFuncPool[I, O any](ctx context.Context, pf ants.PoolFunc, - wg *sync.WaitGroup, + wg WaitGroup, options ...Option, ) (*FuncPool[I, O], error) { // TODO: the automatic invocation of Add/Done might not diff --git a/boost/worker-pool-task.go b/boost/worker-pool-task.go index 6437c5e..74af8b8 100644 --- a/boost/worker-pool-task.go +++ b/boost/worker-pool-task.go @@ -24,7 +24,6 @@ package boost // import ( "context" - "sync" "github.com/snivilised/lorax/internal/ants" ) @@ -38,7 +37,7 @@ type TaskPool[I, O any] struct { // NewTaskPool creates a new worker pool using the native ants interface; ie // new jobs are submitted with Submit(task TaskFunc) func NewTaskPool[I, O any](ctx context.Context, - wg *sync.WaitGroup, + wg WaitGroup, options ...Option, ) (*TaskPool[I, O], error) { pool, err := ants.NewPool(ctx, withDefaults(options...)...)