diff --git a/internal/internalpipe/mapfilter.go b/internal/internalpipe/mapfilter.go index 29b5499..9908cde 100644 --- a/internal/internalpipe/mapfilter.go +++ b/internal/internalpipe/mapfilter.go @@ -3,12 +3,12 @@ package internalpipe // MapFilter applies given function to each element of the underlying slice, // if the second returning value of fn is false, the element is skipped (may be useful for error handling). // returns the slice where each element is n[i] = f(p[i]) if it is not skipped. -func (p Pipe[T]) MapFilter(fn func(*T) (*T, bool)) Pipe[T] { +func (p Pipe[T]) MapFilter(fn func(T) (T, bool)) Pipe[T] { return Pipe[T]{ Fn: func(i int) (*T, bool) { if obj, skipped := p.Fn(i); !skipped { - res, take := fn(obj) - return res, !take + res, take := fn(*obj) + return &res, !take } return nil, true }, diff --git a/internal/internalpipe/mapfilterer_test.go b/internal/internalpipe/mapfilterer_test.go new file mode 100644 index 0000000..6d7b968 --- /dev/null +++ b/internal/internalpipe/mapfilterer_test.go @@ -0,0 +1,109 @@ +package internalpipe + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_MapFilter(t *testing.T) { + t.Parallel() + + exp := make([]int, 0, 100_000) + for i := 0; i < 100_000; i++ { + if i%2 == 0 { + exp = append(exp, i+1) + } + } + + t.Run("single thread lim set", func(t *testing.T) { + p := Pipe[int]{ + Fn: func(i int) (*int, bool) { + return &i, false + }, + Len: 100_000, + ValLim: -1, + GoroutinesCnt: 1, + } + res := p.MapFilter(func(x int) (int, bool) { return x + 1, x%2 == 0 }). + Do() + + require.Equal(t, len(exp), len(res)) + for i, r := range res { + require.Equal(t, exp[i], r) + } + }) + + t.Run("seven thread lim set", func(t *testing.T) { + p := Pipe[int]{ + Fn: func(i int) (*int, bool) { + return &i, false + }, + Len: 100_000, + ValLim: -1, + GoroutinesCnt: 7, + } + + res := p.MapFilter(func(x int) (int, bool) { return x + 1, x%2 == 0 }). + Parallel(7).Do() + + for i, r := range res { + require.Equal(t, exp[i], r) + } + }) + + t.Run("single thread ValLim set", func(t *testing.T) { + p := Pipe[int]{ + Fn: func(i int) (*int, bool) { + return &i, false + }, + Len: -1, + ValLim: len(exp), + GoroutinesCnt: 1, + } + res := p.MapFilter(func(x int) (int, bool) { return x + 1, x%2 == 0 }). + Do() + + require.Equal(t, len(exp), len(res)) + for i, r := range res { + require.Equal(t, exp[i], r) + } + }) + + t.Run("seven thread ValLim set", func(t *testing.T) { + p := Pipe[int]{ + Fn: func(i int) (*int, bool) { + return &i, false + }, + Len: -1, + ValLim: len(exp), + GoroutinesCnt: 7, + } + res := p.MapFilter(func(x int) (int, bool) { return x + 1, x%2 == 0 }). + Do() + + require.Equal(t, len(exp), len(res)) + for i, r := range res { + require.Equal(t, exp[i], r) + } + }) + + t.Run("seven thread ValLim multiple calls", func(t *testing.T) { + p := Pipe[int]{ + Fn: func(i int) (*int, bool) { + return &i, false + }, + Len: -1, + ValLim: len(exp), + GoroutinesCnt: 7, + } + res := p.MapFilter(func(x int) (int, bool) { return x + 1, x%2 == 0 }). + MapFilter(func(x int) (int, bool) { return x, true }). + Do() + + require.Equal(t, len(exp), len(res)) + for i, r := range res { + require.Equal(t, exp[i], r) + } + }) +} diff --git a/pkg/pipe/interface.go b/pkg/pipe/interface.go index e010dc5..133c9a9 100644 --- a/pkg/pipe/interface.go +++ b/pkg/pipe/interface.go @@ -48,7 +48,7 @@ type paralleller[T, PiperT any] interface { type mapper[T, PiperT any] interface { Map(func(T) T) PiperT - MapFilter(func(*T) (*T, bool)) PiperT + MapFilter(func(T) (T, bool)) PiperT } type filterer[T, PiperT any] interface { diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index f681c78..cc3a38c 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -24,7 +24,7 @@ func (p *Pipe[T]) Filter(fn Predicate[T]) Piper[T] { // MapFilter applies given function to each element of the underlying slice, // if the second returning value of fn is false, the element is skipped (may be useful for error handling). // returns the slice where each element is n[i] = f(p[i]) if it is not skipped. -func (p *Pipe[T]) MapFilter(fn func(*T) (*T, bool)) Piper[T] { +func (p *Pipe[T]) MapFilter(fn func(T) (T, bool)) Piper[T] { return &Pipe[T]{p.Pipe.MapFilter(fn)} } diff --git a/pkg/pipe/pipenl.go b/pkg/pipe/pipenl.go index f633093..f8e0be6 100644 --- a/pkg/pipe/pipenl.go +++ b/pkg/pipe/pipenl.go @@ -24,7 +24,7 @@ func (p *PipeNL[T]) Filter(fn Predicate[T]) PiperNoLen[T] { // MapFilter applies given function to each element of the underlying slice, // if the second returning value of fn is false, the element is skipped (may be useful for error handling). // returns the slice where each element is n[i] = f(p[i]) if it is not skipped. -func (p *PipeNL[T]) MapFilter(fn func(*T) (*T, bool)) PiperNoLen[T] { +func (p *PipeNL[T]) MapFilter(fn func(T) (T, bool)) PiperNoLen[T] { return &PipeNL[T]{p.Pipe.MapFilter(fn)} }