Skip to content

Commit

Permalink
improved Any performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Dima Koss committed Jan 27, 2024
1 parent aebe9fd commit b81fbce
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 55 deletions.
25 changes: 10 additions & 15 deletions internal/internalpipe/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ import "sync"

const hugeLenStep = 1 << 15

func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {
var (
obj *T
skipped bool
)
check := func(i int) bool { return i < limit }
if !lenSet {
check = func(i int) bool { return i > -1 && i < limit }
}
for i := 0; check(i); i++ {
func anySingleThread[T any](limit int, fn GeneratorFn[T]) *T {
var obj *T
var skipped bool

for i := 0; i > -1 && (i < limit); i++ {
if obj, skipped = fn(i); !skipped {
return obj
}
Expand All @@ -24,11 +19,11 @@ func anySingleThread[T any](lenSet bool, limit int, fn GeneratorFn[T]) *T {
// Any returns a pointer to a random element in the pipe or nil if none left.
func (p Pipe[T]) Any() *T {
limit := p.limit()
lenSet := p.lenSet()
if p.GoroutinesCnt == 1 {
return anySingleThread(lenSet, limit, p.Fn)
return anySingleThread(limit, p.Fn)
}

lenSet := p.lenSet()
step := hugeLenStep
if lenSet {
step = max(divUp(limit, p.GoroutinesCnt), 1)
Expand Down Expand Up @@ -63,16 +58,16 @@ func (p Pipe[T]) Any() *T {
tickets <- struct{}{}
wg.Done()
}()

// int owerflow case
rg = max(rg, 0)
if lenSet {
rg = min(rg, limit)
}

for j := lf; j < rg; j++ {
mx.Lock()
// mx.Lock()
rs := resSet
mx.Unlock()
// mx.Unlock()
if !rs {
obj, skipped := p.Fn(j)
if !skipped {
Expand Down
89 changes: 49 additions & 40 deletions perf/perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ func BenchmarkMap(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {

// Create a Pipe from the input slice
pipe := pipe.Slice(input)

// Apply the map operation to the Pipe
result := pipe.Map(fib).Do()

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -58,14 +52,8 @@ func BenchmarkMapParallel(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {

// Create a Pipe from the input slice
pipe := pipe.Slice(input).Parallel(uint16(runtime.NumCPU()))

// Apply the map operation to the Pipe
result := pipe.Map(fib).Do()

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -79,13 +67,10 @@ func BenchmarkMapFor(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {

result := make([]int, 0, len(input))
for i := range input {
result = append(result, fib(input[i]))
}

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -99,13 +84,8 @@ func BenchmarkFilter(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {
// Create a Pipe from the input slice
pipe := pipe.Slice(input)

// Apply the filter operation to the Pipe
result := pipe.Filter(filterFunc).Do()

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -119,13 +99,8 @@ func BenchmarkFilterParallel(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {
// Create a Pipe from the input slice
pipe := pipe.Slice(input).Parallel(uint16(runtime.NumCPU()))

// Apply the filter operation to the Pipe
result := pipe.Filter(filterFunc).Do()

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -139,16 +114,12 @@ func BenchmarkFilterFor(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {

// Apply the filter operation to the Pipe
result := make([]int, 0)
for i := range input {
if filterFunc(&input[i]) {
result = append(result, input[i])
}
}

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -163,11 +134,7 @@ func BenchmarkReduce(b *testing.B) {

for j := 0; j < b.N; j++ {
pipe := pipe.Slice(input)

// Apply the reduce operation to the Pipe
result := pipe.Reduce(reduceFunc)

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -181,13 +148,8 @@ func BenchmarkSumParallel(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {
// Create a Pipe from the input slice
pipe := pipe.Slice(input).Parallel(uint16(runtime.NumCPU()))

// Apply the reduce operation to the Pipe
result := pipe.Sum(reduceFunc)

// Perform any necessary assertions on the result
_ = result
}
}
Expand All @@ -202,13 +164,60 @@ func BenchmarkReduceFor(b *testing.B) {
b.StartTimer()

for j := 0; j < b.N; j++ {
// Apply the reduce operation to the Pipe
result := 0
for i := range input {
result = reduceFunc(&result, &input[i])
}
_ = result
}
}

func BenchmarkAny(b *testing.B) {
b.StopTimer()
input := make([]int, 1_000_000)
for i := 0; i < len(input); i++ {
input[i] = i
}
b.StartTimer()

for j := 0; j < b.N; j++ {
pipe := pipe.Slice(input).Filter(func(x *int) bool { return *x > 5_000_00 })
result := pipe.Any()
_ = result
}
}

func BenchmarkAnyParallel(b *testing.B) {
b.StopTimer()
input := make([]int, 1_000_000)
for i := 0; i < len(input); i++ {
input[i] = i
}
b.StartTimer()

// Perform any necessary assertions on the result
for j := 0; j < b.N; j++ {
pipe := pipe.Slice(input).
Parallel(uint16(runtime.NumCPU())).
Filter(func(x *int) bool { return *x > 5_000_00 })
result := pipe.Any()
_ = result
}
}

func BenchmarkAnyFor(b *testing.B) {
b.StopTimer()
input := make([]int, 1_000_000)
for i := 0; i < len(input); i++ {
input[i] = i
}
b.StartTimer()

for j := 0; j < b.N; j++ {
for i := 0; i < len(input); i++ {
if input[i] > 5_000_00 {
_ = input[i]
break
}
}
}
}

0 comments on commit b81fbce

Please sign in to comment.