diff --git a/example/main.go b/example/main.go index d855ff6..c81e66d 100644 --- a/example/main.go +++ b/example/main.go @@ -8,15 +8,22 @@ import ( ) func main() { - p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9}) - y := pipe.NewYeti() - res := p.Yeti(y).Map(func(i int) int { + p := pipe.Slice([]int{1, 2, 3, 4, -5, 6, 7, 8, 9}) + y1, y2 := pipe.NewYeti(), pipe.NewYeti() + res := p.Yeti(y1).Map(func(i int) int { if i < 0 { - y.Yeet(errors.New("omg the value is negative")) + y1.Yeet(errors.New("omg the value is NEGATIVE")) } - return 2 * i + return i - 6 }).Snag(func(err error) { fmt.Println("Snagging an error: " + err.Error()) - }).Do() + }).Yeti(y2).Map(func(i int) int { + if i > 0 { + y2.Yeet(errors.New("omg the value is POSITIVE")) + } + return 2 * i + }).Snag(func(err error) { + fmt.Println("another snag for the same error: " + err.Error()) + }).Filter(func(i *int) bool { return *i > 0 }).Do() fmt.Println(res) } diff --git a/internal/internalpipe/constructor.go b/internal/internalpipe/constructor.go index ae18a64..4b071da 100644 --- a/internal/internalpipe/constructor.go +++ b/internal/internalpipe/constructor.go @@ -24,11 +24,10 @@ func Slice[T any](dt []T) Pipe[T] { Len: len(dtCp), ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -41,11 +40,10 @@ func Func[T any](fn func(i int) (T, bool)) Pipe[T] { Len: notSet, ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -55,11 +53,10 @@ func FuncP[T any](fn func(i int) (*T, bool)) Pipe[T] { Len: notSet, ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -84,10 +81,10 @@ func Range[T constraints.Integer | constraints.Float](start, finish, step T) Pip Len: 0, ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -99,10 +96,10 @@ func Range[T constraints.Integer | constraints.Float](start, finish, step T) Pip Len: max(int((finish-start)/step), 1), ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -115,11 +112,10 @@ func Repeat[T any](x T, n int) Pipe[T] { Len: 0, ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } @@ -131,10 +127,9 @@ func Repeat[T any](x T, n int) Pipe[T] { Len: n, ValLim: notSet, GoroutinesCnt: defaultParallelWrks, - - y: NewYeti(), } - p.prevP = uintptr(unsafe.Pointer(&p)) + ptr := uintptr(unsafe.Pointer(&p)) + p.prevP = ptr return p } diff --git a/internal/internalpipe/do.go b/internal/internalpipe/do.go index 4c48202..eb3a81c 100644 --- a/internal/internalpipe/do.go +++ b/internal/internalpipe/do.go @@ -27,6 +27,8 @@ func (p *Pipe[T]) doToLimit() []T { return []T{} } + defer p.y.Handle() + res := make([]T, 0, p.ValLim) for i := 0; len(res) < p.ValLim; i++ { obj, skipped := p.Fn(i) @@ -43,6 +45,8 @@ func (p *Pipe[T]) doToLimit() []T { // do runs the result evaluation. func (p *Pipe[T]) do(needResult bool) ([]T, int) { + defer p.y.Handle() + var ( eval []ev[T] limit = p.limit() diff --git a/internal/internalpipe/map.go b/internal/internalpipe/map.go index 6fee1db..e3341d1 100644 --- a/internal/internalpipe/map.go +++ b/internal/internalpipe/map.go @@ -1,7 +1,5 @@ package internalpipe -import "unsafe" - // Map applies given function to each element of the underlying slice // returns the slice where each element is n[i] = f(p[i]). func (p Pipe[T]) Map(fn func(T) T) Pipe[T] { @@ -17,7 +15,6 @@ func (p Pipe[T]) Map(fn func(T) T) Pipe[T] { ValLim: p.ValLim, GoroutinesCnt: p.GoroutinesCnt, - prevP: uintptr(unsafe.Pointer(&p)), - y: p.y, + y: p.y, } } diff --git a/internal/internalpipe/snag.go b/internal/internalpipe/snag.go index 3fedf13..f2de786 100644 --- a/internal/internalpipe/snag.go +++ b/internal/internalpipe/snag.go @@ -2,8 +2,11 @@ package internalpipe // Sang ads error handler to a current Pipe step. func (p Pipe[T]) Snag(h ErrHandler) Pipe[T] { - // todo: think about NPE here - p.y.SnagPipe(p.prevP, h) + if p.y == nil { + return p + } + + p.y.Snag(h) return p } @@ -17,7 +20,10 @@ type YeetSnag interface { // Yeti adds Yeti error handler to the pipe. // If some other handlers were set before, they are handled by the Snag func (p Pipe[T]) Yeti(y YeetSnag) Pipe[T] { - // todo: save previous y - p.y = y.(yeti) + yet := y.(yeti) + if p.y != nil { + yet.AddYeti(p.y) + } + p.y = yet return p } diff --git a/internal/internalpipe/yeet.go b/internal/internalpipe/yeet.go index 5866851..e0c53e3 100644 --- a/internal/internalpipe/yeet.go +++ b/internal/internalpipe/yeet.go @@ -2,57 +2,76 @@ package internalpipe import ( "sync" - "unsafe" ) type ErrHandler func(error) type Yeti struct { - Errs []error - Handlers []ErrHandler - Pipe2Hdlrs map[uintptr][]ErrHandler - EMx *sync.Mutex - HMx *sync.Mutex + EMx *sync.Mutex + errs []error + HMx *sync.Mutex + handlers []ErrHandler + YMx *sync.Mutex + yetties []yeti } func NewYeti() *Yeti { return &Yeti{ - Errs: make([]error, 0), - Handlers: make([]ErrHandler, 0), - Pipe2Hdlrs: make(map[uintptr][]ErrHandler), - EMx: &sync.Mutex{}, - HMx: &sync.Mutex{}, + errs: make([]error, 0), + handlers: make([]ErrHandler, 0), + yetties: make([]yeti, 0), + EMx: &sync.Mutex{}, + HMx: &sync.Mutex{}, + YMx: &sync.Mutex{}, } } func (y *Yeti) Yeet(err error) { y.EMx.Lock() - y.Errs = append(y.Errs, err) + y.errs = append(y.errs, err) y.EMx.Unlock() } func (y *Yeti) Snag(handler ErrHandler) { - y.Handlers = append(y.Handlers, handler) + y.HMx.Lock() + y.handlers = append(y.handlers, handler) + y.HMx.Unlock() } -func (y *Yeti) SnagPipe(p uintptr, h ErrHandler) { +func (y *Yeti) Handle() { + if y == nil { + return + } + + y.YMx.Lock() + prevYs := y.yetties + y.YMx.Unlock() + for _, prevYetti := range prevYs { + prevYetti.Handle() + } + y.HMx.Lock() - if hdlrs, ok := y.Pipe2Hdlrs[p]; ok { - y.Pipe2Hdlrs[p] = append(hdlrs, h) - } else { - // FIXME: use constant, remove else - y.Pipe2Hdlrs[p] = append(make([]ErrHandler, 0, 10), h) + y.EMx.Lock() + defer y.HMx.Unlock() + defer y.EMx.Unlock() + + for _, err := range y.errs { + for _, handle := range y.handlers { + handle(err) + } } - y.HMx.Unlock() } -func (y *Yeti) Handle(p unsafe.Pointer) { - // TODO: impl +func (y *Yeti) AddYeti(yt yeti) { + y.YMx.Lock() + y.yetties = append(y.yetties, yt) + y.YMx.Unlock() } type yeti interface { Yeet(err error) - SnagPipe(p uintptr, h ErrHandler) + Snag(h ErrHandler) // TODO: Handle should be called after each Pipe function eval - Handle(p unsafe.Pointer) + Handle() + AddYeti(y yeti) }