Skip to content

Commit

Permalink
updated Yeet logic (now you need multiple yetties to handle different…
Browse files Browse the repository at this point in the history
… stage errros); Yeet and Snag are working for simple cases not; need REFACTOR
  • Loading branch information
Dima Koss committed Nov 6, 2023
1 parent 59e30e4 commit 690ddd5
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 59 deletions.
19 changes: 13 additions & 6 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@ import (
)

func main() {
p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})
y := pipe.NewYeti()
res := p.Yeti(y).Map(func(i int) int {
p := pipe.Slice([]int{1, 2, 3, 4, -5, 6, 7, 8, 9})
y1, y2 := pipe.NewYeti(), pipe.NewYeti()
res := p.Yeti(y1).Map(func(i int) int {
if i < 0 {
y.Yeet(errors.New("omg the value is negative"))
y1.Yeet(errors.New("omg the value is NEGATIVE"))
}
return 2 * i
return i - 6
}).Snag(func(err error) {
fmt.Println("Snagging an error: " + err.Error())
}).Do()
}).Yeti(y2).Map(func(i int) int {
if i > 0 {
y2.Yeet(errors.New("omg the value is POSITIVE"))
}
return 2 * i
}).Snag(func(err error) {
fmt.Println("another snag for the same error: " + err.Error())
}).Filter(func(i *int) bool { return *i > 0 }).Do()
fmt.Println(res)
}
37 changes: 16 additions & 21 deletions internal/internalpipe/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ func Slice[T any](dt []T) Pipe[T] {
Len: len(dtCp),
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -41,11 +40,10 @@ func Func[T any](fn func(i int) (T, bool)) Pipe[T] {
Len: notSet,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -55,11 +53,10 @@ func FuncP[T any](fn func(i int) (*T, bool)) Pipe[T] {
Len: notSet,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -84,10 +81,10 @@ func Range[T constraints.Integer | constraints.Float](start, finish, step T) Pip
Len: 0,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}
p.prevP = uintptr(unsafe.Pointer(&p))

ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -99,10 +96,10 @@ func Range[T constraints.Integer | constraints.Float](start, finish, step T) Pip
Len: max(int((finish-start)/step), 1),
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}
p.prevP = uintptr(unsafe.Pointer(&p))

ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -115,11 +112,10 @@ func Repeat[T any](x T, n int) Pipe[T] {
Len: 0,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}

Expand All @@ -131,10 +127,9 @@ func Repeat[T any](x T, n int) Pipe[T] {
Len: n,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
ptr := uintptr(unsafe.Pointer(&p))
p.prevP = ptr
return p
}
4 changes: 4 additions & 0 deletions internal/internalpipe/do.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func (p *Pipe[T]) doToLimit() []T {
return []T{}
}

defer p.y.Handle()

res := make([]T, 0, p.ValLim)
for i := 0; len(res) < p.ValLim; i++ {
obj, skipped := p.Fn(i)
Expand All @@ -43,6 +45,8 @@ func (p *Pipe[T]) doToLimit() []T {

// do runs the result evaluation.
func (p *Pipe[T]) do(needResult bool) ([]T, int) {
defer p.y.Handle()

var (
eval []ev[T]
limit = p.limit()
Expand Down
5 changes: 1 addition & 4 deletions internal/internalpipe/map.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package internalpipe

import "unsafe"

// Map applies given function to each element of the underlying slice
// returns the slice where each element is n[i] = f(p[i]).
func (p Pipe[T]) Map(fn func(T) T) Pipe[T] {
Expand All @@ -17,7 +15,6 @@ func (p Pipe[T]) Map(fn func(T) T) Pipe[T] {
ValLim: p.ValLim,
GoroutinesCnt: p.GoroutinesCnt,

prevP: uintptr(unsafe.Pointer(&p)),
y: p.y,
y: p.y,
}
}
14 changes: 10 additions & 4 deletions internal/internalpipe/snag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package internalpipe

// Sang ads error handler to a current Pipe step.
func (p Pipe[T]) Snag(h ErrHandler) Pipe[T] {
// todo: think about NPE here
p.y.SnagPipe(p.prevP, h)
if p.y == nil {
return p
}

p.y.Snag(h)
return p
}

Expand All @@ -17,7 +20,10 @@ type YeetSnag interface {
// Yeti adds Yeti error handler to the pipe.
// If some other handlers were set before, they are handled by the Snag
func (p Pipe[T]) Yeti(y YeetSnag) Pipe[T] {
// todo: save previous y
p.y = y.(yeti)
yet := y.(yeti)
if p.y != nil {
yet.AddYeti(p.y)
}
p.y = yet
return p
}
67 changes: 43 additions & 24 deletions internal/internalpipe/yeet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,76 @@ package internalpipe

import (
"sync"
"unsafe"
)

type ErrHandler func(error)

type Yeti struct {
Errs []error
Handlers []ErrHandler
Pipe2Hdlrs map[uintptr][]ErrHandler
EMx *sync.Mutex
HMx *sync.Mutex
EMx *sync.Mutex
errs []error
HMx *sync.Mutex
handlers []ErrHandler
YMx *sync.Mutex
yetties []yeti
}

func NewYeti() *Yeti {
return &Yeti{
Errs: make([]error, 0),
Handlers: make([]ErrHandler, 0),
Pipe2Hdlrs: make(map[uintptr][]ErrHandler),
EMx: &sync.Mutex{},
HMx: &sync.Mutex{},
errs: make([]error, 0),
handlers: make([]ErrHandler, 0),
yetties: make([]yeti, 0),
EMx: &sync.Mutex{},
HMx: &sync.Mutex{},
YMx: &sync.Mutex{},
}
}

func (y *Yeti) Yeet(err error) {
y.EMx.Lock()
y.Errs = append(y.Errs, err)
y.errs = append(y.errs, err)
y.EMx.Unlock()
}

func (y *Yeti) Snag(handler ErrHandler) {
y.Handlers = append(y.Handlers, handler)
y.HMx.Lock()
y.handlers = append(y.handlers, handler)
y.HMx.Unlock()
}

func (y *Yeti) SnagPipe(p uintptr, h ErrHandler) {
func (y *Yeti) Handle() {
if y == nil {
return
}

y.YMx.Lock()
prevYs := y.yetties
y.YMx.Unlock()
for _, prevYetti := range prevYs {
prevYetti.Handle()
}

y.HMx.Lock()
if hdlrs, ok := y.Pipe2Hdlrs[p]; ok {
y.Pipe2Hdlrs[p] = append(hdlrs, h)
} else {
// FIXME: use constant, remove else
y.Pipe2Hdlrs[p] = append(make([]ErrHandler, 0, 10), h)
y.EMx.Lock()
defer y.HMx.Unlock()
defer y.EMx.Unlock()

for _, err := range y.errs {
for _, handle := range y.handlers {
handle(err)
}
}
y.HMx.Unlock()
}

func (y *Yeti) Handle(p unsafe.Pointer) {
// TODO: impl
func (y *Yeti) AddYeti(yt yeti) {
y.YMx.Lock()
y.yetties = append(y.yetties, yt)
y.YMx.Unlock()
}

type yeti interface {
Yeet(err error)
SnagPipe(p uintptr, h ErrHandler)
Snag(h ErrHandler)
// TODO: Handle should be called after each Pipe function eval
Handle(p unsafe.Pointer)
Handle()
AddYeti(y yeti)
}

0 comments on commit 690ddd5

Please sign in to comment.