From dc6f7e09f653a9d322600bb7f490ca1f6ea68e1c Mon Sep 17 00:00:00 2001 From: rokostik Date: Mon, 15 Jan 2024 17:09:49 +0100 Subject: [PATCH] concurrency improvements --- evaldo/builtins_goroutines.go | 178 +++++++++++++++++- .../{ => goroutines}/goroutines_A.rye | 0 .../{ => goroutines}/goroutines_B.rye | 2 +- .../goroutines/goroutines_channels.rye | 26 +++ .../goroutines/goroutines_select.rye | 22 +++ .../goroutines/goroutines_select_default.rye | 18 ++ .../goroutines/goroutines_waitgroup.rye | 17 ++ loader/loader.go | 13 -- util/util.go | 8 +- 9 files changed, 267 insertions(+), 17 deletions(-) rename examples/examples-wip/{ => goroutines}/goroutines_A.rye (100%) rename examples/examples-wip/{ => goroutines}/goroutines_B.rye (79%) create mode 100644 examples/examples-wip/goroutines/goroutines_channels.rye create mode 100644 examples/examples-wip/goroutines/goroutines_select.rye create mode 100644 examples/examples-wip/goroutines/goroutines_select_default.rye create mode 100644 examples/examples-wip/goroutines/goroutines_waitgroup.rye diff --git a/evaldo/builtins_goroutines.go b/evaldo/builtins_goroutines.go index 5900fd1d..674498bb 100755 --- a/evaldo/builtins_goroutines.go +++ b/evaldo/builtins_goroutines.go @@ -7,6 +7,8 @@ package evaldo import ( "fmt" + "reflect" + "sync" "time" "github.com/refaktor/rye/env" @@ -126,8 +128,12 @@ var Builtins_goroutines = map[string]*env.Builtin{ Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { switch chn := arg0.(type) { case env.Native: - msg := <-chn.Value.(chan *env.Object) - return *msg + msg, ok := <-chn.Value.(chan *env.Object) + if ok { + return *msg + } else { + return *env.NewError("channel closed") + } default: ps.FailureFlag = true return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-channel//read") @@ -148,4 +154,172 @@ var Builtins_goroutines = map[string]*env.Builtin{ } }, }, + + "Rye-channel//close": { + Argsn: 1, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + switch chn := arg0.(type) { + case env.Native: + close(chn.Value.(chan *env.Object)) + return arg0 + default: + ps.FailureFlag = true + return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-channel//close") + } + }, + }, + + "new-waitgroup": { + Argsn: 0, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + var wg sync.WaitGroup + return *env.NewNative(ps.Idx, &wg, "Rye-waitgroup") + }, + }, + + "Rye-waitgroup//add": { + Argsn: 2, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + switch wg := arg0.(type) { + case env.Native: + switch count := arg1.(type) { + case env.Integer: + wg.Value.(*sync.WaitGroup).Add(int(count.Value)) + return arg0 + default: + ps.FailureFlag = true + return MakeArgError(ps, 2, []env.Type{env.IntegerType}, "Rye-waitgroup//add") + } + default: + ps.FailureFlag = true + return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//add") + } + }, + }, + + "Rye-waitgroup//done": { + Argsn: 1, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + switch wg := arg0.(type) { + case env.Native: + wg.Value.(*sync.WaitGroup).Done() + return arg0 + default: + ps.FailureFlag = true + return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//done") + } + }, + }, + + "Rye-waitgroup//wait": { + Argsn: 1, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + switch wg := arg0.(type) { + case env.Native: + wg.Value.(*sync.WaitGroup).Wait() + return arg0 + default: + ps.FailureFlag = true + return MakeArgError(ps, 1, []env.Type{env.NativeType}, "Rye-waitgroup//wait") + } + }, + }, + + "select": { + Argsn: 1, + Doc: "TODODOC.", + Fn: func(ps *env.ProgramState, arg0 env.Object, arg1 env.Object, arg2 env.Object, arg3 env.Object, arg4 env.Object) env.Object { + switch block := arg0.(type) { + case env.Block: + ser := ps.Ser + ps.Ser = block.Series + + var hasDeafult bool + var cases []reflect.SelectCase + var funcs []env.Function + for ps.Ser.Pos() < ps.Ser.Len() { + EvalExpression2(ps, false) + defaultFn, ok := ps.Res.(env.Function) + // handle default case + if ok { + if hasDeafult { + ps.FailureFlag = true + return MakeBuiltinError(ps, "select can only have one default case", "select") + } + if defaultFn.Argsn != 0 { + ps.FailureFlag = true + return MakeBuiltinError(ps, "function with 0 args required", "select") + } + defaultCase := make(chan struct{}) + close(defaultCase) // close it immediately so it's always ready to receive + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(defaultCase), + }) + funcs = append(funcs, defaultFn) + hasDeafult = true + continue + } + // handle regular channel case + native, ok := ps.Res.(env.Native) + if !ok { + ps.FailureFlag = true + return MakeBuiltinError(ps, "first argument of a case must be a channel", "select") + } + ch, ok := native.Value.(chan *env.Object) + if !ok { + ps.FailureFlag = true + return MakeBuiltinError(ps, "first argument of a case must be a channel", "select") + } + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}) + + EvalExpression2(ps, false) + fn, ok := ps.Res.(env.Function) + if !ok { + ps.FailureFlag = true + return MakeBuiltinError(ps, "second argument of a case must be a function", "select") + } + if fn.Argsn > 1 { + ps.FailureFlag = true + return MakeBuiltinError(ps, "function with 0 or 1 arg required", "select") + } + funcs = append(funcs, fn) + } + ps.Ser = ser + + chosen, value, recvOK := reflect.Select(cases) + fn := funcs[chosen] + + psTemp := env.ProgramState{} + err := copier.Copy(&psTemp, &ps) + if err != nil { + ps.FailureFlag = true + return MakeBuiltinError(ps, fmt.Sprintf("failed to copy ps: %s", err), "select") + } + var arg env.Object = nil + if recvOK { + val, ok := value.Interface().(*env.Object) + if !ok { + ps.FailureFlag = true + return MakeBuiltinError(ps, "value from channel is not an object", "select") + } + arg = *val + } + if fn.Argsn == 0 { + arg = nil + } + CallFunction(fn, &psTemp, arg, false, nil) + + default: + ps.FailureFlag = true + return MakeArgError(ps, 1, []env.Type{env.BlockType}, "select") + } + return arg0 + }, + }, } diff --git a/examples/examples-wip/goroutines_A.rye b/examples/examples-wip/goroutines/goroutines_A.rye similarity index 100% rename from examples/examples-wip/goroutines_A.rye rename to examples/examples-wip/goroutines/goroutines_A.rye diff --git a/examples/examples-wip/goroutines_B.rye b/examples/examples-wip/goroutines/goroutines_B.rye similarity index 79% rename from examples/examples-wip/goroutines_B.rye rename to examples/examples-wip/goroutines/goroutines_B.rye index dba91dad..a0a3430a 100644 --- a/examples/examples-wip/goroutines_B.rye +++ b/examples/examples-wip/goroutines/goroutines_B.rye @@ -1,6 +1,6 @@ downloader: fn1 { - |pass { .get .length? .prn , prn "is length of" } + |pass { .get .length? .prn , prn " is length of " } |print } diff --git a/examples/examples-wip/goroutines/goroutines_channels.rye b/examples/examples-wip/goroutines/goroutines_channels.rye new file mode 100644 index 00000000..26e15982 --- /dev/null +++ b/examples/examples-wip/goroutines/goroutines_channels.rye @@ -0,0 +1,26 @@ +jobs: new-channel 5 +done: new-channel 0 + +go fn { } { + print "waiting to process jobs" + forever { + j: read jobs |fix { + print "recieved all jobs" + send done 1 + return + } + + print\ssv vals { "received job" j } + sleep 1000 + } +} + +loop 3 { :i + send jobs i + print\ssv vals { "sent job" i } +} +close jobs + +print "waiting for jobs to finish" +read done +print "finished" diff --git a/examples/examples-wip/goroutines/goroutines_select.rye b/examples/examples-wip/goroutines/goroutines_select.rye new file mode 100644 index 00000000..77dc9ca5 --- /dev/null +++ b/examples/examples-wip/goroutines/goroutines_select.rye @@ -0,0 +1,22 @@ +c1: new-channel 0 +c2: new-channel 0 + +go fn { } { + sleep 1000 + send c1 "one" +} +go fn { } { + sleep 2000 + send c2 "two" +} + +loop 2 { + select { + c1 fn { msg } { + print\ssv vals { "received from c1:" msg } + } + c2 fn { msg } { + print\ssv vals { "received from c2:" msg } + } + } +} diff --git a/examples/examples-wip/goroutines/goroutines_select_default.rye b/examples/examples-wip/goroutines/goroutines_select_default.rye new file mode 100644 index 00000000..4b61e7c6 --- /dev/null +++ b/examples/examples-wip/goroutines/goroutines_select_default.rye @@ -0,0 +1,18 @@ +c: new-channel 0 + +go fn { } { + sleep 1000 + send c "hello" +} + +loop 5 { + select { + c fn { msg } { + print msg + } + fn { } { + print "default" + sleep 500 + } + } +} diff --git a/examples/examples-wip/goroutines/goroutines_waitgroup.rye b/examples/examples-wip/goroutines/goroutines_waitgroup.rye new file mode 100644 index 00000000..55101228 --- /dev/null +++ b/examples/examples-wip/goroutines/goroutines_waitgroup.rye @@ -0,0 +1,17 @@ +wg: new-waitgroup + +work: fn { id } { + print\ssv vals { "worker" id "starting" } + sleep 1000 + print\ssv vals { "worker" id "done" } + wg .done +} + +loop 5 { :i + wg .add 1 + go-with i ?work +} + +print "waiting for workers to finish" +wg .wait +print "finished" diff --git a/loader/loader.go b/loader/loader.go index 4242717e..f7afa4d1 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -83,10 +83,8 @@ func LoadString(input string, sig bool) (env.Object, *env.Idxs) { input = removeBangLine(input) inp1 := strings.TrimSpace(input) - var noBraces bool if strings.Index("{", inp1) != 0 { input = "{ " + input + " }" - noBraces = true } parser := newParser() @@ -95,17 +93,6 @@ func LoadString(input string, sig bool) (env.Object, *env.Idxs) { if err != nil { fmt.Print("\x1b[35;3m") errStr := err.Error() - if noBraces { - // hacky way to remove the first and last curly braces and - // fix the error position without changing the parser library - errStr = strings.Replace(errStr, "{", "", 1) - if i := strings.LastIndex(errStr, "}"); i >= 0 { - errStr = errStr[:i] + errStr[i+1:] - } - if i := strings.LastIndex(errStr, "-"); i >= 0 { - errStr = errStr[:i] + errStr[i+1:] - } - } fmt.Print(errStr) fmt.Println("\x1b[0m") diff --git a/util/util.go b/util/util.go index abde6af7..b8dd0b9f 100644 --- a/util/util.go +++ b/util/util.go @@ -140,7 +140,13 @@ func FormatSsv(val env.Object, e env.Idxs) string { if i > 0 { r.WriteString(" ") } - r.WriteString(b.Series.Get(i).Probe(e)) + o := b.Series.Get(i) + switch ob := o.(type) { + case env.String: + r.WriteString(ob.Value) + default: + r.WriteString(ob.Probe(e)) + } } } }