From 7b05e96dfb4b4af35ec9575e0f99a54d3363f9f2 Mon Sep 17 00:00:00 2001 From: s8sg Date: Sat, 25 Jul 2020 13:31:19 +0800 Subject: [PATCH] Fix issue that causes worker to not receive partial request Signed-off-by: s8sg --- goflow.go | 16 +++- runtime/flow_runtime.go | 122 +++++++++++++++++++------ runtime/new_request_handler_wrapper.go | 2 +- 3 files changed, 107 insertions(+), 33 deletions(-) diff --git a/goflow.go b/goflow.go index e8e0e80..e0dbc02 100644 --- a/goflow.go +++ b/goflow.go @@ -33,6 +33,10 @@ func (fs *FlowService) Start(flowName string, handler runtime.FlowDefinitionHand RedisURL: fs.RedisURL, DataStore: fs.DataStore, Logger: fs.Logger, + ServerPort: fs.Port, + ReadTimeout: fs.RequestReadTimeout, + WriteTimeout: fs.RequestWriteTimeout, + Concurrency: fs.WorkerConcurrency, } errorChan := make(chan error) defer close(errorChan) @@ -54,8 +58,11 @@ func (fs *FlowService) StartServer(flowName string, handler runtime.FlowDefiniti RedisURL: fs.RedisURL, DataStore: fs.DataStore, Logger: fs.Logger, + ServerPort: fs.Port, + ReadTimeout: fs.RequestReadTimeout, + WriteTimeout: fs.RequestWriteTimeout, } - err := fs.runtime.StartServer(fs.Port, fs.RequestReadTimeout, fs.RequestWriteTimeout) + err := fs.runtime.StartServer() return fmt.Errorf("server has stopped, error: %v", err) } @@ -71,8 +78,9 @@ func (fs *FlowService) StartWorker(flowName string, handler runtime.FlowDefiniti RedisURL: fs.RedisURL, DataStore: fs.DataStore, Logger: fs.Logger, + Concurrency: fs.WorkerConcurrency, } - err := fs.runtime.StartQueueWorker("redis://"+fs.RedisURL+"/", fs.WorkerConcurrency) + err := fs.runtime.StartQueueWorker() return fmt.Errorf("worker has stopped, error: %v", err) } @@ -98,11 +106,11 @@ func (fs *FlowService) ConfigureDefault() { } func (fs *FlowService) queueWorker(errorChan chan error) { - err := fs.runtime.StartQueueWorker("redis://"+fs.RedisURL+"/", fs.WorkerConcurrency) + err := fs.runtime.StartQueueWorker() errorChan <- fmt.Errorf("worker has stopped, error: %v", err) } func (fs *FlowService) server(errorChan chan error) { - err := fs.runtime.StartServer(fs.Port, fs.RequestReadTimeout, fs.RequestWriteTimeout) + err := fs.runtime.StartServer() errorChan <- fmt.Errorf("server has stopped, error: %v", err) } diff --git a/runtime/flow_runtime.go b/runtime/flow_runtime.go index 25af21a..a5635ea 100644 --- a/runtime/flow_runtime.go +++ b/runtime/flow_runtime.go @@ -1,7 +1,6 @@ package runtime import ( - "encoding/json" "fmt" "github.com/benmanns/goworker" "github.com/faasflow/goflow/eventhandler" @@ -22,8 +21,13 @@ type FlowRuntime struct { stateStore sdk.StateStore DataStore sdk.DataStore Logger sdk.Logger - eventHandler sdk.EventHandler - settings goworker.WorkerSettings + Concurrency int + ServerPort int + ReadTimeout time.Duration + WriteTimeout time.Duration + + eventHandler sdk.EventHandler + settings goworker.WorkerSettings } const ( @@ -70,7 +74,7 @@ func (fRuntime *FlowRuntime) CreateExecutor(req *runtime.Request) (executor.Exec } // StartServer starts listening for new request -func (fRuntime *FlowRuntime) StartServer(port int, readTimeout time.Duration, writeTimeout time.Duration) error { +func (fRuntime *FlowRuntime) StartServer() error { err := fRuntime.Init() if err != nil { @@ -78,43 +82,57 @@ func (fRuntime *FlowRuntime) StartServer(port int, readTimeout time.Duration, wr } s := &http.Server{ - Addr: fmt.Sprintf(":%d", port), - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, + Addr: fmt.Sprintf(":%d", fRuntime.ServerPort), + ReadTimeout: fRuntime.ReadTimeout, + WriteTimeout: fRuntime.WriteTimeout, Handler: router(fRuntime), MaxHeaderBytes: 1 << 20, // Max header of 1MB } + fRuntime.settings = goworker.WorkerSettings{ + URI: "redis://" + fRuntime.RedisURL + "/", + Connections: 100, + Queues: []string{fRuntime.queueId()}, + UseNumber: true, + ExitOnComplete: false, + Concurrency: fRuntime.Concurrency, + Namespace: "resque:", + Interval: 1.0, + } + goworker.SetSettings(fRuntime.settings) + return s.ListenAndServe() } // StartQueueWorker starts listening for request in queue -func (fRuntime *FlowRuntime) StartQueueWorker(redisUri string, concurrency int) error { +func (fRuntime *FlowRuntime) StartQueueWorker() error { + err := fRuntime.Init() + if err != nil { + return err + } + fRuntime.settings = goworker.WorkerSettings{ - URI: redisUri, + URI: "redis://" + fRuntime.RedisURL + "/", Connections: 100, Queues: []string{fRuntime.queueId()}, UseNumber: true, ExitOnComplete: false, - Concurrency: concurrency, + Concurrency: fRuntime.Concurrency, Namespace: "resque:", Interval: 1.0, } goworker.SetSettings(fRuntime.settings) goworker.Register("QueueWorker", fRuntime.queueReceiver) + return goworker.Work() } func (fRuntime *FlowRuntime) EnqueueRequest(pr *runtime.Request) error { - encodedRequest, error := json.Marshal(pr) - if error != nil { - return fmt.Errorf("failed to marshal request while enqueing, error %v", error) - } return goworker.Enqueue(&goworker.Job{ Queue: fRuntime.queueId(), Payload: goworker.Payload{ Class: "QueueWorker", - Args: []interface{}{string(encodedRequest)}, + Args: []interface{}{pr.FlowName, pr.RequestID, string(pr.Body), pr.Header, pr.RawQuery, pr.Query}, }, }) } @@ -122,25 +140,73 @@ func (fRuntime *FlowRuntime) EnqueueRequest(pr *runtime.Request) error { func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) error { fRuntime.Logger.Log(fmt.Sprintf("Request received by worker at queue %v", queue)) if queue != fRuntime.queueId() { + fRuntime.Logger.Log(fmt.Sprintf("Request queue mismatch %s/%s", queue, fRuntime.queueId())) return nil } - reqData, ok := args[0].(string) - if !ok { - fRuntime.Logger.Log(fmt.Sprintf("failed to load argument %v", reqData)) - return fmt.Errorf("failed to load argument %v", args[0]) + request := &runtime.Request{} + + if args[0] != nil { + flowName, ok := args[0].(string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load flowname from arguments %v", args[0])) + return fmt.Errorf("failed to load flowname from arguments %v", args[0]) + } + request.FlowName = flowName } - request := &runtime.Request{} - err := json.Unmarshal([]byte(reqData), request) - if err != nil { - fRuntime.Logger.Log(fmt.Sprintf("failed to unmarshal request, error %v", err)) - return fmt.Errorf("failed to unmarshal request, error %v", err) + if args[1] != nil { + requestId, ok := args[1].(string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load requestId from arguments %v", args[1])) + return fmt.Errorf("failed to load requestId from arguments %v", args[1]) + } + request.RequestID = requestId + } + + if args[2] != nil { + body, ok := args[2].(string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load body from arguments %v", args[2])) + return fmt.Errorf("failed to load body from arguments %v", args[2]) + } + request.Body = []byte(body) + } + + if args[3] != nil { + header, ok := args[3].(map[string][]string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load header from arguments %v", args[3])) + return fmt.Errorf("failed to load header from arguments %v", args[3]) + } + request.Header = header + } else { + request.Header = make(map[string][]string) + } + + if args[4] != nil { + rawQuery, ok := args[4].(string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load raw-query from arguments %v", args[4])) + return fmt.Errorf("failed to load raw-query from arguments %v", args[4]) + } + request.RawQuery = rawQuery + } + + if args[5] != nil { + query, ok := args[5].(map[string][]string) + if !ok { + fRuntime.Logger.Log(fmt.Sprintf("failed to load query from arguments %v", args[5])) + return fmt.Errorf("failed to load query from arguments %v", args[5]) + } + request.Query = query + } else { + request.Query = make(map[string][]string) } executor, err := fRuntime.CreateExecutor(request) if err != nil { - fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to execute request, error: %v", request.RequestID, err)) + fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to execute request, error: %v", request.RequestID, err)) return fmt.Errorf("failed to execute request " + request.RequestID + ", error: " + err.Error()) } response := &runtime.Response{} @@ -149,7 +215,7 @@ func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) er err = handler.PartialExecuteFlowHandler(response, request, executor) if err != nil { - fRuntime.Logger.Log(fmt.Sprint("request failed to be processed. error: " + err.Error())) + fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to be processed. error: %v", request.RequestID, err.Error())) return fmt.Errorf("request failed to be processed. error: " + err.Error()) } @@ -157,5 +223,5 @@ func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) er } func (fRuntime *FlowRuntime) queueId() string { - return fmt.Sprint("%s_%s", PartialRequestQueue, fRuntime.FlowName) -} \ No newline at end of file + return fmt.Sprintf("%s_%s", PartialRequestQueue, fRuntime.FlowName) +} diff --git a/runtime/new_request_handler_wrapper.go b/runtime/new_request_handler_wrapper.go index 0464271..e01dce3 100644 --- a/runtime/new_request_handler_wrapper.go +++ b/runtime/new_request_handler_wrapper.go @@ -70,4 +70,4 @@ func getFlowName(runtime runtimepkg.Runtime) string { return "" } return fr.FlowName -} \ No newline at end of file +}