Skip to content

Commit

Permalink
Merge pull request #101 from rokostik/concurrency-improvements
Browse files Browse the repository at this point in the history
Improve concurrent functionality
  • Loading branch information
refaktor authored Jan 15, 2024
2 parents 2314a4f + dc6f7e0 commit 16b8329
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 17 deletions.
178 changes: 176 additions & 2 deletions evaldo/builtins_goroutines.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package evaldo

import (
"fmt"
"reflect"
"sync"
"time"

"github.com/refaktor/rye/env"
Expand Down Expand Up @@ -126,8 +128,12 @@ var Builtins_goroutines = map[string]*env.Builtin{
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch chn := arg0.(type) {
case env.Native:
msg := <-chn.Value.(chan *env.Object)
return *msg
msg, ok := <-chn.Value.(chan *env.Object)
if ok {
return *msg
} else {
return *env.NewError("channel closed")
}
default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-channel//read")
Expand All @@ -148,4 +154,172 @@ var Builtins_goroutines = map[string]*env.Builtin{
}
},
},

"Rye-channel//close": {
Argsn: 1,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch chn := arg0.(type) {
case env.Native:
close(chn.Value.(chan *env.Object))
return arg0
default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-channel//close")
}
},
},

"new-waitgroup": {
Argsn: 0,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
var wg sync.WaitGroup
return *env.NewNative(ps.Idx, &wg, "Rye-waitgroup")
},
},

"Rye-waitgroup//add": {
Argsn: 2,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch wg := arg0.(type) {
case env.Native:
switch count := arg1.(type) {
case env.Integer:
wg.Value.(*sync.WaitGroup).Add(int(count.Value))
return arg0
default:
ps.FailureFlag = true
return MakeArgError(ps, 2, []env.Type{env.IntegerType}, "Rye-waitgroup//add")
}
default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//add")
}
},
},

"Rye-waitgroup//done": {
Argsn: 1,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch wg := arg0.(type) {
case env.Native:
wg.Value.(*sync.WaitGroup).Done()
return arg0
default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//done")
}
},
},

"Rye-waitgroup//wait": {
Argsn: 1,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch wg := arg0.(type) {
case env.Native:
wg.Value.(*sync.WaitGroup).Wait()
return arg0
default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//wait")
}
},
},

"select": {
Argsn: 1,
Doc: "TODODOC.",
Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object {
switch block := arg0.(type) {
case env.Block:
ser := ps.Ser
ps.Ser = block.Series

var hasDeafult bool
var cases []reflect.SelectCase
var funcs []env.Function
for ps.Ser.Pos() < ps.Ser.Len() {
EvalExpression2(ps, false)
defaultFn, ok := ps.Res.(env.Function)
// handle default case
if ok {
if hasDeafult {
ps.FailureFlag = true
return MakeBuiltinError(ps, "select can only have one default case", "select")
}
if defaultFn.Argsn != 0 {
ps.FailureFlag = true
return MakeBuiltinError(ps, "function with 0 args required", "select")
}
defaultCase := make(chan struct{})
close(defaultCase) // close it immediately so it's always ready to receive
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(defaultCase),
})
funcs = append(funcs, defaultFn)
hasDeafult = true
continue
}
// handle regular channel case
native, ok := ps.Res.(env.Native)
if !ok {
ps.FailureFlag = true
return MakeBuiltinError(ps, "first argument of a case must be a channel", "select")
}
ch, ok := native.Value.(chan *env.Object)
if !ok {
ps.FailureFlag = true
return MakeBuiltinError(ps, "first argument of a case must be a channel", "select")
}
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)})

EvalExpression2(ps, false)
fn, ok := ps.Res.(env.Function)
if !ok {
ps.FailureFlag = true
return MakeBuiltinError(ps, "second argument of a case must be a function", "select")
}
if fn.Argsn > 1 {
ps.FailureFlag = true
return MakeBuiltinError(ps, "function with 0 or 1 arg required", "select")
}
funcs = append(funcs, fn)
}
ps.Ser = ser

chosen, value, recvOK := reflect.Select(cases)
fn := funcs[chosen]

psTemp := env.ProgramState{}
err := copier.Copy(&psTemp, &ps)
if err != nil {
ps.FailureFlag = true
return MakeBuiltinError(ps, fmt.Sprintf("failed to copy ps: %s", err), "select")
}
var arg env.Object = nil
if recvOK {
val, ok := value.Interface().(*env.Object)
if !ok {
ps.FailureFlag = true
return MakeBuiltinError(ps, "value from channel is not an object", "select")
}
arg = *val
}
if fn.Argsn == 0 {
arg = nil
}
CallFunction(fn, &psTemp, arg, false, nil)

default:
ps.FailureFlag = true
return MakeArgError(ps, 1, []env.Type{env.BlockType}, "select")
}
return arg0
},
},
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

downloader: fn1 {
|pass { .get .length? .prn , prn "is length of" }
|pass { .get .length? .prn , prn " is length of " }
|print
}

Expand Down
26 changes: 26 additions & 0 deletions examples/examples-wip/goroutines/goroutines_channels.rye
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
jobs: new-channel 5
done: new-channel 0

go fn { } {
print "waiting to process jobs"
forever {
j: read jobs |fix {
print "recieved all jobs"
send done 1
return
}

print\ssv vals { "received job" j }
sleep 1000
}
}

loop 3 { :i
send jobs i
print\ssv vals { "sent job" i }
}
close jobs

print "waiting for jobs to finish"
read done
print "finished"
22 changes: 22 additions & 0 deletions examples/examples-wip/goroutines/goroutines_select.rye
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
c1: new-channel 0
c2: new-channel 0

go fn { } {
sleep 1000
send c1 "one"
}
go fn { } {
sleep 2000
send c2 "two"
}

loop 2 {
select {
c1 fn { msg } {
print\ssv vals { "received from c1:" msg }
}
c2 fn { msg } {
print\ssv vals { "received from c2:" msg }
}
}
}
18 changes: 18 additions & 0 deletions examples/examples-wip/goroutines/goroutines_select_default.rye
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
c: new-channel 0

go fn { } {
sleep 1000
send c "hello"
}

loop 5 {
select {
c fn { msg } {
print msg
}
fn { } {
print "default"
sleep 500
}
}
}
17 changes: 17 additions & 0 deletions examples/examples-wip/goroutines/goroutines_waitgroup.rye
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
wg: new-waitgroup

work: fn { id } {
print\ssv vals { "worker" id "starting" }
sleep 1000
print\ssv vals { "worker" id "done" }
wg .done
}

loop 5 { :i
wg .add 1
go-with i ?work
}

print "waiting for workers to finish"
wg .wait
print "finished"
13 changes: 0 additions & 13 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ func LoadString(input string, sig bool) (env.Object, *env.Idxs) {
input = removeBangLine(input)

inp1 := strings.TrimSpace(input)
var noBraces bool
if strings.Index("{", inp1) != 0 {
input = "{ " + input + " }"
noBraces = true
}

parser := newParser()
Expand All @@ -95,17 +93,6 @@ func LoadString(input string, sig bool) (env.Object, *env.Idxs) {
if err != nil {
fmt.Print("\x1b[35;3m")
errStr := err.Error()
if noBraces {
// hacky way to remove the first and last curly braces and
// fix the error position without changing the parser library
errStr = strings.Replace(errStr, "{", "", 1)
if i := strings.LastIndex(errStr, "}"); i >= 0 {
errStr = errStr[:i] + errStr[i+1:]
}
if i := strings.LastIndex(errStr, "-"); i >= 0 {
errStr = errStr[:i] + errStr[i+1:]
}
}
fmt.Print(errStr)
fmt.Println("\x1b[0m")

Expand Down
8 changes: 7 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,13 @@ func FormatSsv(val env.Object, e env.Idxs) string {
if i > 0 {
r.WriteString(" ")
}
r.WriteString(b.Series.Get(i).Probe(e))
o := b.Series.Get(i)
switch ob := o.(type) {
case env.String:
r.WriteString(ob.Value)
default:
r.WriteString(ob.Probe(e))
}
}
}
}
Expand Down

0 comments on commit 16b8329

Please sign in to comment.