Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.0.7 #15

Merged
merged 10 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ The following functions can be used to create a new `Pipe` (this is how I call t
- :frog: `Sort(less func(x, y *T) bool) Pipe`: sorts the elements of the `Pipe` using the provided `less` function as the comparison function.

#### Retrieve a single element or perform a boolean check
- :frog: `Any(fn func(x T) bool) bool`: returns `true` if any element of the `Pipe` satisfies the predicate `fn`, and `false` otherwise. *Available for unknown length.*
- :frog: `Any() T`: returns a random element existing in the pipe. *Available for unknown length.*
- :frog: `First() T`: returns the first element of the `Pipe`, or `nil` if the `Pipe` is empty. *Available for unknown length.*
- :frog: `Count() int`: returns the number of elements in the `Pipe`. It does not allocate memory for the elements, but instead simply returns the number of elements in the `Pipe`.

Expand Down
92 changes: 62 additions & 30 deletions internal/internalpipe/any.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package internalpipe

import "sync"
import (
"sync"
"time"
)

const infiniteLenStep = 1 << 15
const hugeLenStep = 1 << 15

func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {
func anySingleThread[T any](limit int, fn GeneratorFn[T]) *T {
var obj *T
var skipped bool
for i := 0; (!lenSet && i >= 0) || (i < limit); i++ {

for i := 0; i < limit; i++ {
if obj, skipped = fn(i); !skipped {
return obj
}
Expand All @@ -17,64 +21,91 @@ func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {

// Any returns a pointer to a random element in the pipe or nil if none left.
func (p Pipe[T]) Any() *T {
const mutexUpdateCoef = 18

limit := p.limit()
lenSet := p.lenSet()
if p.GoroutinesCnt == 1 {
return anySingleThread(lenSet, limit, p.Fn)
return anySingleThread(limit, p.Fn)
}

step := infiniteLenStep
lenSet := p.lenSet()
step := hugeLenStep
if lenSet {
step = max(divUp(limit, p.GoroutinesCnt), 1)
}

var (
res = make(chan *T)
// if p.len is not set, we need tickets to control the amount of goroutines
tickets = genTickets(p.GoroutinesCnt)
resSet bool
resCh = make(chan *T, 1)
mx sync.Mutex

done = make(chan struct{})
wg sync.WaitGroup
tickets = genTickets(p.GoroutinesCnt)
wg sync.WaitGroup
)
if !lenSet {
step = infiniteLenStep
}

defer close(resCh)
setObj := func(obj *T) {
select {
case <-done:
return
default:
close(done)
res <- obj
mx.Lock()
if !resSet {
resSet = true
resCh <- obj
}
mx.Unlock()
}

go func() {
// i >= 0 is for an int owerflow case
for i := 0; i >= 0 && (!lenSet || i < limit); i += step {
wg.Add(1)
<-tickets

go func(lf, rg int) {
defer func() {
wg.Done()
tickets <- struct{}{}
wg.Done()
}()

// accounting int owerflow case with max(rg, 0)
// int owerflow case
rg = max(rg, 0)
if lenSet {
rg = min(rg, limit)
}

var avgFnTime time.Duration
var avgUpdResSetTime time.Duration
resSetUpdCnt := int64(0)
beforeLastResSetUpd := 0

getResSet := func() bool {
start := time.Now()
mx.Lock()
rs := resSet
mx.Unlock()
avgUpdResSetTime = time.Duration(
(int64(time.Since(start)) + int64(avgUpdResSetTime)*(resSetUpdCnt)) / (resSetUpdCnt + 1),
)
resSetUpdCnt++
beforeLastResSetUpd = 0
return rs
}
rs := getResSet()
cnt := 0
for j := lf; j < rg; j++ {
select {
case <-done:
return
default:
beforeLastResSetUpd++
if j != lf &&
avgFnTime != 0 &&
int64(beforeLastResSetUpd) > (mutexUpdateCoef*int64(avgUpdResSetTime)/int64(avgFnTime)) {
rs = getResSet()
cnt++
}
if !rs {
start := time.Now()
obj, skipped := p.Fn(j)
if !skipped {
setObj(obj)
return
}
avgFnTime = time.Duration(
(int64(time.Since(start)) + int64(avgFnTime)*int64(j-lf)) / int64(j-lf+1),
)
}
}
}(i, i+step)
Expand All @@ -83,8 +114,9 @@ func (p Pipe[T]) Any() *T {
go func() {
wg.Wait()
setObj(nil)
defer close(tickets)
}()
}()

return <-res
return <-resCh
}
69 changes: 52 additions & 17 deletions internal/internalpipe/any_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/koss-null/funcfrog/internal/primitive/pointer"
)

var (
Expand All @@ -28,48 +26,60 @@ func TestAny(t *testing.T) {
t.Parallel()

t.Run("Single thread no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], a100k[i] <= 90_000.0
return a100k[i], a100k[i] > 90_000.0
})
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, *s)
require.Greater(t, *s, 90_000.0)
})

t.Run("Seven thread no limit", func(t *testing.T) {
t.Run("Single thread limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] <= 90_000.0
}).Parallel(7)
return a100k[i], a100k[i] > 90_000.0
}).Gen(len(a100k))
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, *s)
require.Greater(t, *s, 90_000.0)
})

t.Run("Single thread limit", func(t *testing.T) {
t.Run("Seven thread no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], a100k[i] <= 90_000.0
}).Gen(len(a100k))
if i >= len(a100k) {
return 0., false
}
return a100k[i], true
}).
Filter(func(x *float64) bool { return *x > 90_000. }).
Parallel(7)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, pointer.From(s))
require.Greater(t, *s, 90_000.0)
})

t.Run("Seven thread limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] <= 90_000.0
return a100k[i], a100k[i] > 90_000.0
}).Gen(len(a100k)).Parallel(7)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, 90_000.0, pointer.From(s))
require.Greater(t, *s, 90_000.0)
})

t.Run("Single thread NF limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], false
}).Gen(len(a100k))
Expand All @@ -78,6 +88,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread NF limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -89,6 +101,8 @@ func TestAny(t *testing.T) {
})

t.Run("Single thread bounded limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
return a100k[i], false
}).Gen(len(a100k))
Expand All @@ -97,6 +111,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread bounded limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -109,6 +125,8 @@ func TestAny(t *testing.T) {
})

t.Run("Single thread bounded no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -121,6 +139,8 @@ func TestAny(t *testing.T) {
})

t.Run("Seven thread bounded no limit", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
Expand All @@ -131,4 +151,19 @@ func TestAny(t *testing.T) {
require.NotNil(t, s)
require.Equal(t, 90_001., *s)
})

t.Run("Ten thread bounded no limit filter", func(t *testing.T) {
t.Parallel()

p := Func(func(i int) (float64, bool) {
if i >= len(a100k) {
return 0., false
}
return a100k[i], a100k[i] > 90_000.0 && a100k[i] < 190_000.0
}).Filter(func(x *float64) bool { return int(*x)%2 == 0 }).Parallel(10)
s := p.Any()
require.NotNil(t, s)
require.Greater(t, *s, 90_000.)
require.Less(t, *s, 190_001.)
})
}
18 changes: 9 additions & 9 deletions internal/internalpipe/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
j := 0
for i := 0; i < 100_000; i += 2 {
Expand All @@ -41,7 +41,7 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
j := 0
for i := 0; i < 100_000; i += 2 {
Expand All @@ -50,7 +50,7 @@ func Test_Filter(t *testing.T) {
}
})
t.Run("single thread even numbers empty res filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -61,12 +61,12 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
t.Run("seven thread even numbers empty res filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -77,12 +77,12 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
t.Run("seven thread even numbers empty res double filter", func(t *testing.T) {
pts := pointer.To(7)
pts := pointer.Ref(7)
p := Pipe[int]{
Fn: func(i int) (*int, bool) {
return pts, false
Expand All @@ -93,9 +93,9 @@ func Test_Filter(t *testing.T) {
}

res := p.Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Filter(func(x *int) bool {
return pointer.From(x)%2 == 0
return pointer.Deref(x)%2 == 0
}).Do()
require.Equal(t, 0, len(res))
})
Expand Down
Loading
Loading