Skip to content

Commit

Permalink
feat(boost): add boost wait group interface (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 9, 2024
1 parent 2963873 commit 42d182a
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 29 deletions.
3 changes: 1 addition & 2 deletions boost/base-pool.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package boost

import (
"sync"
"sync/atomic"
)

type (
basePool[I, O any] struct {
wg *sync.WaitGroup
wg WaitGroup
sequence int32
inputDupCh *Duplex[I]
oi *outputInfo[O]
Expand Down
12 changes: 9 additions & 3 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ type (
PoolResultStreamR = <-chan *PoolResult
PoolResultStreamW = chan<- *PoolResult

// Next is a sequential unique id generator func type
Next func() string

// OnCancel is the callback required by StartCancellationMonitor
OnCancel func()

// WaitGroup allows the core sync.WaitGroup to be decorated by the client
// for debugging purposes.
WaitGroup interface {
Add(delta int)
Done()
Wait()
}
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
Expand Down
5 changes: 2 additions & 3 deletions boost/cancellation-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package boost

import (
"context"
"sync"
)

// StartCancellationMonitor
func StartCancellationMonitor(ctx context.Context,
cancel context.CancelFunc,
wg *sync.WaitGroup,
wg WaitGroup,
cancelCh CancelStreamR,
on OnCancel,
) {
wg.Add(1)
go func(ctx context.Context,
cancel context.CancelFunc,
wg *sync.WaitGroup,
wg WaitGroup,
cancelCh CancelStreamR,
) {
defer wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions boost/examples/mf-all-output-consumed-by-range/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down Expand Up @@ -97,7 +97,7 @@ func produce(ctx context.Context,

func consume(_ context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down
2 changes: 1 addition & 1 deletion boost/examples/mf-err-missing-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down
4 changes: 2 additions & 2 deletions boost/examples/mf-err-timeout-on-send/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand All @@ -80,7 +80,7 @@ func produce(ctx context.Context,

func consume(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down
4 changes: 2 additions & 2 deletions boost/examples/mf-input-injected-via-chan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (

func inject(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand All @@ -82,7 +82,7 @@ func inject(ctx context.Context,

func consume(_ context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down
3 changes: 1 addition & 2 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package boost
import (
"context"
"errors"
"sync"
"time"

"github.com/snivilised/lorax/internal/ants"
Expand Down Expand Up @@ -61,7 +60,7 @@ func (p *taskPool) Waiting() int {
}

func source[I any](ctx context.Context,
wg *sync.WaitGroup, o *ants.Options,
wg WaitGroup, o *ants.Options,
injectable injectable[I],
closable closable,
) *Duplex[I] {
Expand Down
49 changes: 49 additions & 0 deletions boost/wait-group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package boost

import (
"sync"
"sync/atomic"
)

// Tracker
type Tracker func(count int32)

// TrackWaitGroup returns a trackable wait group for the native sync
// wait group specified; useful for debugging purposes.
func TrackWaitGroup(wg *sync.WaitGroup, add, done Tracker) WaitGroup {
return &TrackableWaitGroup{
wg: wg,
add: add,
done: done,
}
}

// TrackableWaitGroup
type TrackableWaitGroup struct {
wg *sync.WaitGroup
counter int32
add, done Tracker
}

// Add
func (t *TrackableWaitGroup) Add(delta int) {
n := atomic.AddInt32(&t.counter, int32(delta))
t.wg.Add(delta)
t.add(n)
}

// Done
func (t *TrackableWaitGroup) Done() {
n := atomic.AddInt32(&t.counter, int32(-1))
t.wg.Done()
t.done(n)
}

// Wait
func (t *TrackableWaitGroup) Wait() {
t.wg.Wait()
}

func (t *TrackableWaitGroup) Count() int32 {
return atomic.LoadInt32(&t.counter)
}
45 changes: 45 additions & 0 deletions boost/wait-group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package boost_test

import (
"sync"
"time"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok

"github.com/snivilised/lorax/boost"
)

var _ = Describe("boost.WaitGroup", func() {
Context("given: a sync.WaitGroup", func() {
It("should: track invocations", func() {
var wg sync.WaitGroup
tracker := boost.TrackWaitGroup(&wg,
func(count int32) {
GinkgoWriter.Printf("---> 🔴 Add (%v)\n", count)
},
func(count int32) {
GinkgoWriter.Printf("---> 🟢 Done (%v)\n", count)
},
)

for range 10 {
tracker.Add(1)
go func(tracker boost.WaitGroup) {
defer tracker.Done()

const delay = time.Millisecond * 100
time.Sleep(delay)
}(tracker)
}

tracker.Wait()

if wg, ok := tracker.(*boost.TrackableWaitGroup); ok {
Expect(wg.Count()).To(BeEquivalentTo(0))
} else {
Fail("tracker should be *boost.TrackableWaitGroup")
}
})
})
})
7 changes: 3 additions & 4 deletions boost/worker-pool-func-manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package boost

import (
"context"
"sync"
"time"

"github.com/snivilised/lorax/internal/ants"
Expand All @@ -28,7 +27,7 @@ type ManifoldFuncPool[I, O any] struct {
// NewManifoldFuncPool creates a new manifold function based worker pool.
func NewManifoldFuncPool[I, O any](ctx context.Context,
mf ManifoldFunc[I, O],
wg *sync.WaitGroup,
wg WaitGroup,
options ...Option,
) (*ManifoldFuncPool[I, O], error) {
var (
Expand Down Expand Up @@ -74,7 +73,7 @@ func (p *ManifoldFuncPool[I, O]) Post(ctx context.Context, input I) error {
// mutually exclusive; that is to say, if Source is called, then Post
// must not be called; any such invocations will be ignored.
func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context,
wg *sync.WaitGroup,
wg WaitGroup,
) SourceStreamW[I] {
o := p.pool.GetOptions()

Expand Down Expand Up @@ -107,7 +106,7 @@ func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) {
p.wg.Add(1)
go func(ctx context.Context,
pool *ManifoldFuncPool[I, O],
wg *sync.WaitGroup,
wg WaitGroup,
interval time.Duration,
) {
defer wg.Done()
Expand Down
8 changes: 4 additions & 4 deletions boost/worker-pool-func-manifold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand All @@ -25,7 +25,7 @@ func produce(ctx context.Context,

func inject(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand All @@ -39,7 +39,7 @@ func inject(ctx context.Context,

func consume(_ context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down Expand Up @@ -165,7 +165,7 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
wg.Add(1)
go func(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
wg boost.WaitGroup,
) {
defer wg.Done()

Expand Down
3 changes: 1 addition & 2 deletions boost/worker-pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
)
Expand All @@ -17,7 +16,7 @@ type FuncPool[I, O any] struct {
// new jobs are submitted with Submit(task TaskFunc)
func NewFuncPool[I, O any](ctx context.Context,
pf ants.PoolFunc,
wg *sync.WaitGroup,
wg WaitGroup,
options ...Option,
) (*FuncPool[I, O], error) {
// TODO: the automatic invocation of Add/Done might not
Expand Down
3 changes: 1 addition & 2 deletions boost/worker-pool-task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package boost
//
import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
)
Expand All @@ -38,7 +37,7 @@ type TaskPool[I, O any] struct {
// NewTaskPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewTaskPool[I, O any](ctx context.Context,
wg *sync.WaitGroup,
wg WaitGroup,
options ...Option,
) (*TaskPool[I, O], error) {
pool, err := ants.NewPool(ctx, withDefaults(options...)...)
Expand Down

0 comments on commit 42d182a

Please sign in to comment.