Skip to content

Commit

Permalink
add prt and yetty copying into internalpipe struct
Browse files Browse the repository at this point in the history
  • Loading branch information
Dima Kossovich committed Oct 9, 2023
1 parent 8eba1ad commit 73083cd
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 11 deletions.
55 changes: 47 additions & 8 deletions internal/internalpipe/constructor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package internalpipe

import "golang.org/x/exp/constraints"
import (
"unsafe"

"golang.org/x/exp/constraints"
)

const (
defaultParallelWrks = 1
Expand All @@ -10,7 +14,7 @@ const (
func Slice[T any](dt []T) Pipe[T] {
dtCp := make([]T, len(dt))
copy(dtCp, dt)
return Pipe[T]{
p := Pipe[T]{
Fn: func(i int) (*T, bool) {
if i >= len(dtCp) {
return nil, true
Expand All @@ -20,28 +24,43 @@ func Slice[T any](dt []T) Pipe[T] {
Len: len(dtCp),
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
return p
}

func Func[T any](fn func(i int) (T, bool)) Pipe[T] {
return Pipe[T]{
p := Pipe[T]{
Fn: func(i int) (*T, bool) {
obj, exist := fn(i)
return &obj, !exist
},
Len: notSet,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
return p
}

func FuncP[T any](fn func(i int) (*T, bool)) Pipe[T] {
return Pipe[T]{
p := Pipe[T]{
Fn: fn,
Len: notSet,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
return p
}

func Cycle[T any](a []T) Pipe[T] {
Expand All @@ -58,44 +77,64 @@ func Cycle[T any](a []T) Pipe[T] {

func Range[T constraints.Integer | constraints.Float](start, finish, step T) Pipe[T] {
if start >= finish {
return Pipe[T]{
p := Pipe[T]{
Fn: func(int) (*T, bool) {
return nil, true
},
Len: 0,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}
p.prevP = uintptr(unsafe.Pointer(&p))
return p
}
return Pipe[T]{

p := Pipe[T]{
Fn: func(i int) (*T, bool) {
val := start + T(i)*step
return &val, val >= finish
},
Len: max(int((finish-start)/step), 1),
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}
p.prevP = uintptr(unsafe.Pointer(&p))
return p
}

func Repeat[T any](x T, n int) Pipe[T] {
if n <= 0 {
return Pipe[T]{
p := Pipe[T]{
Fn: func(int) (*T, bool) {
return nil, true
},
Len: 0,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
return p
}
return Pipe[T]{

p := Pipe[T]{
Fn: func(i int) (*T, bool) {
cp := x
return &cp, i >= n
},
Len: n,
ValLim: notSet,
GoroutinesCnt: defaultParallelWrks,

y: NewYeti(),
}

p.prevP = uintptr(unsafe.Pointer(&p))
return p
}
5 changes: 5 additions & 0 deletions internal/internalpipe/erase.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package internalpipe

import "unsafe"

func (p Pipe[T]) Erase() Pipe[any] {
return Pipe[any]{
Fn: func(i int) (*any, bool) {
Expand All @@ -12,5 +14,8 @@ func (p Pipe[T]) Erase() Pipe[any] {
Len: p.Len,
ValLim: p.ValLim,
GoroutinesCnt: p.GoroutinesCnt,

prevP: uintptr(unsafe.Pointer(&p)),
y: p.y,
}
}
5 changes: 5 additions & 0 deletions internal/internalpipe/filter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package internalpipe

import "unsafe"

// Filter leaves only items with true predicate fn.
func (p Pipe[T]) Filter(fn func(*T) bool) Pipe[T] {
return Pipe[T]{
Expand All @@ -15,5 +17,8 @@ func (p Pipe[T]) Filter(fn func(*T) bool) Pipe[T] {
Len: p.Len,
ValLim: p.ValLim,
GoroutinesCnt: p.GoroutinesCnt,

prevP: uintptr(unsafe.Pointer(&p)),
y: p.y,
}
}
5 changes: 5 additions & 0 deletions internal/internalpipe/map.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package internalpipe

import "unsafe"

// Map applies given function to each element of the underlying slice
// returns the slice where each element is n[i] = f(p[i]).
func (p Pipe[T]) Map(fn func(T) T) Pipe[T] {
Expand All @@ -14,5 +16,8 @@ func (p Pipe[T]) Map(fn func(T) T) Pipe[T] {
Len: p.Len,
ValLim: p.ValLim,
GoroutinesCnt: p.GoroutinesCnt,

prevP: uintptr(unsafe.Pointer(&p)),
y: p.y,
}
}
7 changes: 4 additions & 3 deletions internal/internalpipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ type Pipe[T any] struct {
Len int
ValLim int
GoroutinesCnt int
y yeti

prevP uintptr //*Pipe[T]
y yeti
}

// Parallel set n - the amount of goroutines to run on.
Expand Down Expand Up @@ -62,8 +64,7 @@ func (p Pipe[T]) Count() int {

// Sang ads error handler to a current Pipe step.
func (p Pipe[T]) Snag(h ErrHandler) Pipe[T] {
// FIXME: this pointer should be taken from p as the pointer to the previous Pipe step
p.y.SnagPipe(unsafe.Pointer(&p), h)
p.y.SnagPipe(unsafe.Pointer(p.prevP), h)
return p
}

Expand Down
4 changes: 4 additions & 0 deletions internal/internalpipe/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internalpipe

import (
"sync"
"unsafe"

"github.com/koss-null/funcfrog/internal/algo/parallel/qsort"
)
Expand All @@ -28,5 +29,8 @@ func (p Pipe[T]) Sort(less func(*T, *T) bool) Pipe[T] {
Len: p.Len,
ValLim: p.ValLim,
GoroutinesCnt: p.GoroutinesCnt,

prevP: uintptr(unsafe.Pointer(&p)),
y: p.y,
}
}
14 changes: 14 additions & 0 deletions internal/internalpipe/yeet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ type Yeti struct {
HMx *sync.Mutex
}

func NewYeti() *Yeti {
return &Yeti{
Errs: make([]error, 0),
Handlers: make([]ErrHandler, 0),
Pipe2Hdlrs: make(map[unsafe.Pointer][]ErrHandler),
EMx: &sync.Mutex{},
HMx: &sync.Mutex{},
}
}

func (y *Yeti) Yeet(err error) {
y.EMx.Lock()
y.Errs = append(y.Errs, err)
Expand All @@ -36,6 +46,10 @@ func (y *Yeti) SnagPipe(p unsafe.Pointer, h ErrHandler) {
y.HMx.Unlock()
}

func (y *Yeti) Handle(p unsafe.Pointer) {
// TODO: impl
}

type yeti interface {
Yeet(err error)
SnagPipe(p unsafe.Pointer, h ErrHandler)
Expand Down

0 comments on commit 73083cd

Please sign in to comment.