Skip to content

Commit

Permalink
Merge pull request #15 from koss-null/1.0.7
Browse files Browse the repository at this point in the history
1.0.7
  • Loading branch information
koss-null authored Jan 27, 2024
2 parents 532c8e3 + 5363a8d commit 61f1527
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 103 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ The following functions can be used to create a new `Pipe` (this is how I call t
- :frog: `Sort(less func(x, y *T) bool) Pipe`: sorts the elements of the `Pipe` using the provided `less` function as the comparison function.

#### Retrieve a single element or perform a boolean check
- :frog: `Any(fn func(x T) bool) bool`: returns `true` if any element of the `Pipe` satisfies the predicate `fn`, and `false` otherwise. *Available for unknown length.*
- :frog: `Any() T`: returns a random element existing in the pipe. *Available for unknown length.*
- :frog: `First() T`: returns the first element of the `Pipe`, or `nil` if the `Pipe` is empty. *Available for unknown length.*
- :frog: `Count() int`: returns the number of elements in the `Pipe`. It does not allocate memory for the elements, but instead simply returns the number of elements in the `Pipe`.

Expand Down
92 changes: 62 additions & 30 deletions internal/internalpipe/any.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package internalpipe

import "sync"
import (
"sync"
"time"
)

const infiniteLenStep = 1 << 15
const hugeLenStep = 1 << 15

func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {
func anySingleThread[T any](limit int, fn GeneratorFn[T]) *T {
var obj *T
var skipped bool
for i := 0; (!lenSet && i >= 0) || (i < limit); i++ {

for i := 0; i < limit; i++ {
if obj, skipped = fn(i); !skipped {
return obj
}
Expand All @@ -17,64 +21,91 @@ func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {

// Any returns a pointer to a random element in the pipe or nil if none left.
func (p Pipe[T]) Any() *T {
const mutexUpdateCoef = 18

limit := p.limit()
lenSet := p.lenSet()
if p.GoroutinesCnt == 1 {
return anySingleThread(lenSet, limit, p.Fn)
return anySingleThread(limit, p.Fn)
}

step := infiniteLenStep
lenSet := p.lenSet()
step := hugeLenStep
if lenSet {
step = max(divUp(limit, p.GoroutinesCnt), 1)
}

var (
res = make(chan *T)
// if p.len is not set, we need tickets to control the amount of goroutines
tickets = genTickets(p.GoroutinesCnt)
resSet bool
resCh = make(chan *T, 1)
mx sync.Mutex

done = make(chan struct{})
wg sync.WaitGroup
tickets = genTickets(p.GoroutinesCnt)
wg sync.WaitGroup
)
if !lenSet {
step = infiniteLenStep
}

defer close(resCh)
setObj := func(obj *T) {
select {
case <-done:
return
default:
close(done)
res <- obj
mx.Lock()
if !resSet {
resSet = true
resCh <- obj
}
mx.Unlock()
}

go func() {
// i >= 0 is for an int owerflow case
for i := 0; i >= 0 && (!lenSet || i < limit); i += step {
wg.Add(1)
<-tickets

go func(lf, rg int) {
defer func() {
wg.Done()
tickets <- struct{}{}
wg.Done()
}()

// accounting int owerflow case with max(rg, 0)
// int owerflow case
rg = max(rg, 0)
if lenSet {
rg = min(rg, limit)
}

var avgFnTime time.Duration
var avgUpdResSetTime time.Duration
resSetUpdCnt := int64(0)
beforeLastResSetUpd := 0

getResSet := func() bool {
start := time.Now()
mx.Lock()
rs := resSet
mx.Unlock()
avgUpdResSetTime = time.Duration(
(int64(time.Since(start)) + int64(avgUpdResSetTime)*(resSetUpdCnt)) / (resSetUpdCnt + 1),
)
resSetUpdCnt++
beforeLastResSetUpd = 0
return rs
}
rs := getResSet()
cnt := 0
for j := lf; j < rg; j++ {
select {
case <-done:
return
default:
beforeLastResSetUpd++
if j != lf &&
avgFnTime != 0 &&
int64(beforeLastResSetUpd) > (mutexUpdateCoef*int64(avgUpdResSetTime)/int64(avgFnTime)) {
rs = getResSet()
cnt++
}
if !rs {
start := time.Now()
obj, skipped := p.Fn(j)
if !skipped {
setObj(obj)
return
}
avgFnTime = time.Duration(
(int64(time.Since(start)) + int64(avgFnTime)*int64(j-lf)) / int64(j-lf+1),
)
}
}
}(i, i+step)
Expand All @@ -83,8 +114,9 @@ func (p Pipe[T]) Any() *T {
go func() {
wg.Wait()
setObj(nil)
defer close(tickets)
}()
}()

return <-res
return <-resCh
}
69 changes: 52 additions & 17 deletions internal/internalpipe/any_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/koss-null/funcfrog/internal/primitive/pointer"
)

var (
Expand All @@ -28,48 +26,60 @@ func TestAny(t *testing.T) {
t.Parallel()

t.Run("Single thread no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], a100k[i] <= 90_000.0
return a100k[i], a100k[i] > 90_000.0
})
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, *s)
require.Greater(t, *s, 90_000.0)
})

t.Run("Seven thread no limit", func(t *testing.T) {
t.Run("Single thread limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] <= 90_000.0
}).Parallel(7)
return a100k[i], a100k[i] > 90_000.0
}).Gen(len(a100k))
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, *s)
require.Greater(t, *s, 90_000.0)
})

t.Run("Single thread limit", func(t *testing.T) {
t.Run("Seven thread no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], a100k[i] <= 90_000.0
}).Gen(len(a100k))
if i >= len(a100k) {
return 0., false
}
return a100k[i], true
}).
Filter(func(x *float64) bool { return *x > 90_000. }).
Parallel(7)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, pointer.From(s))
require.Greater(t, *s, 90_000.0)
})

t.Run("Seven thread limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] <= 90_000.0
return a100k[i], a100k[i] > 90_000.0
}).Gen(len(a100k)).Parallel(7)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, pointer.From(s))
require.Greater(t, *s, 90_000.0)
})

t.Run("Single thread NF limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], false
}).Gen(len(a100k))
Expand All @@ -78,6 +88,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread NF limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -89,6 +101,8 @@ func TestAny(t *testing.T) {
})

t.Run("Single thread bounded limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], false
}).Gen(len(a100k))
Expand All @@ -97,6 +111,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread bounded limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -109,6 +125,8 @@ func TestAny(t *testing.T) {
})

t.Run("Single thread bounded no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -121,6 +139,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread bounded no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -131,4 +151,19 @@ func TestAny(t *testing.T) {
require.NotNil(t, s)
require.Equal(t, 90_001., *s)
})

t.Run("Ten thread bounded no limit filter", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] > 90_000.0 && a100k[i] < 190_000.0
}).Filter(func(x *float64) bool { return int(*x)%2 == 0 }).Parallel(10)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, *s, 90_000.)
require.Less(t, *s, 190_001.)
})
}
18 changes: 9 additions & 9 deletions internal/internalpipe/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
j := 0
for i := 0; i < 100_000; i += 2 {
Expand All @@ -41,7 +41,7 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
j := 0
for i := 0; i < 100_000; i += 2 {
Expand All @@ -50,7 +50,7 @@ func Test_Filter(t *testing.T) {
}
})
t.Run("single thread even numbers empty res filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -61,12 +61,12 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
t.Run("seven thread even numbers empty res filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -77,12 +77,12 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
t.Run("seven thread even numbers empty res double filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -93,9 +93,9 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
Expand Down
Loading

0 comments on commit 61f1527

Please sign in to comment.