Skip to content

Commit

Permalink
Fix issue that causes worker to not receive partial request
Browse files Browse the repository at this point in the history
Signed-off-by: s8sg <swarvanusg@gmail.com>
  • Loading branch information
s8sg committed Jul 25, 2020
1 parent 6edf939 commit 7b05e96
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 33 deletions.
16 changes: 12 additions & 4 deletions goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (fs *FlowService) Start(flowName string, handler runtime.FlowDefinitionHand
RedisURL: fs.RedisURL,
DataStore: fs.DataStore,
Logger: fs.Logger,
ServerPort: fs.Port,
ReadTimeout: fs.RequestReadTimeout,
WriteTimeout: fs.RequestWriteTimeout,
Concurrency: fs.WorkerConcurrency,
}
errorChan := make(chan error)
defer close(errorChan)
Expand All @@ -54,8 +58,11 @@ func (fs *FlowService) StartServer(flowName string, handler runtime.FlowDefiniti
RedisURL: fs.RedisURL,
DataStore: fs.DataStore,
Logger: fs.Logger,
ServerPort: fs.Port,
ReadTimeout: fs.RequestReadTimeout,
WriteTimeout: fs.RequestWriteTimeout,
}
err := fs.runtime.StartServer(fs.Port, fs.RequestReadTimeout, fs.RequestWriteTimeout)
err := fs.runtime.StartServer()
return fmt.Errorf("server has stopped, error: %v", err)
}

Expand All @@ -71,8 +78,9 @@ func (fs *FlowService) StartWorker(flowName string, handler runtime.FlowDefiniti
RedisURL: fs.RedisURL,
DataStore: fs.DataStore,
Logger: fs.Logger,
Concurrency: fs.WorkerConcurrency,
}
err := fs.runtime.StartQueueWorker("redis://"+fs.RedisURL+"/", fs.WorkerConcurrency)
err := fs.runtime.StartQueueWorker()
return fmt.Errorf("worker has stopped, error: %v", err)
}

Expand All @@ -98,11 +106,11 @@ func (fs *FlowService) ConfigureDefault() {
}

func (fs *FlowService) queueWorker(errorChan chan error) {
err := fs.runtime.StartQueueWorker("redis://"+fs.RedisURL+"/", fs.WorkerConcurrency)
err := fs.runtime.StartQueueWorker()
errorChan <- fmt.Errorf("worker has stopped, error: %v", err)
}

func (fs *FlowService) server(errorChan chan error) {
err := fs.runtime.StartServer(fs.Port, fs.RequestReadTimeout, fs.RequestWriteTimeout)
err := fs.runtime.StartServer()
errorChan <- fmt.Errorf("server has stopped, error: %v", err)
}
122 changes: 94 additions & 28 deletions runtime/flow_runtime.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package runtime

import (
"encoding/json"
"fmt"
"github.com/benmanns/goworker"
"github.com/faasflow/goflow/eventhandler"
Expand All @@ -22,8 +21,13 @@ type FlowRuntime struct {
stateStore sdk.StateStore
DataStore sdk.DataStore
Logger sdk.Logger
eventHandler sdk.EventHandler
settings goworker.WorkerSettings
Concurrency int
ServerPort int
ReadTimeout time.Duration
WriteTimeout time.Duration

eventHandler sdk.EventHandler
settings goworker.WorkerSettings
}

const (
Expand Down Expand Up @@ -70,77 +74,139 @@ func (fRuntime *FlowRuntime) CreateExecutor(req *runtime.Request) (executor.Exec
}

// StartServer starts listening for new request
func (fRuntime *FlowRuntime) StartServer(port int, readTimeout time.Duration, writeTimeout time.Duration) error {
func (fRuntime *FlowRuntime) StartServer() error {

err := fRuntime.Init()
if err != nil {
return err
}

s := &http.Server{
Addr: fmt.Sprintf(":%d", port),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Addr: fmt.Sprintf(":%d", fRuntime.ServerPort),
ReadTimeout: fRuntime.ReadTimeout,
WriteTimeout: fRuntime.WriteTimeout,
Handler: router(fRuntime),
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}

fRuntime.settings = goworker.WorkerSettings{
URI: "redis://" + fRuntime.RedisURL + "/",
Connections: 100,
Queues: []string{fRuntime.queueId()},
UseNumber: true,
ExitOnComplete: false,
Concurrency: fRuntime.Concurrency,
Namespace: "resque:",
Interval: 1.0,
}
goworker.SetSettings(fRuntime.settings)

return s.ListenAndServe()
}

// StartQueueWorker starts listening for request in queue
func (fRuntime *FlowRuntime) StartQueueWorker(redisUri string, concurrency int) error {
func (fRuntime *FlowRuntime) StartQueueWorker() error {
err := fRuntime.Init()
if err != nil {
return err
}

fRuntime.settings = goworker.WorkerSettings{
URI: redisUri,
URI: "redis://" + fRuntime.RedisURL + "/",
Connections: 100,
Queues: []string{fRuntime.queueId()},
UseNumber: true,
ExitOnComplete: false,
Concurrency: concurrency,
Concurrency: fRuntime.Concurrency,
Namespace: "resque:",
Interval: 1.0,
}
goworker.SetSettings(fRuntime.settings)
goworker.Register("QueueWorker", fRuntime.queueReceiver)

return goworker.Work()
}

func (fRuntime *FlowRuntime) EnqueueRequest(pr *runtime.Request) error {
encodedRequest, error := json.Marshal(pr)
if error != nil {
return fmt.Errorf("failed to marshal request while enqueing, error %v", error)
}
return goworker.Enqueue(&goworker.Job{
Queue: fRuntime.queueId(),
Payload: goworker.Payload{
Class: "QueueWorker",
Args: []interface{}{string(encodedRequest)},
Args: []interface{}{pr.FlowName, pr.RequestID, string(pr.Body), pr.Header, pr.RawQuery, pr.Query},
},
})
}

func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) error {
fRuntime.Logger.Log(fmt.Sprintf("Request received by worker at queue %v", queue))
if queue != fRuntime.queueId() {
fRuntime.Logger.Log(fmt.Sprintf("Request queue mismatch %s/%s", queue, fRuntime.queueId()))
return nil
}

reqData, ok := args[0].(string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load argument %v", reqData))
return fmt.Errorf("failed to load argument %v", args[0])
request := &runtime.Request{}

if args[0] != nil {
flowName, ok := args[0].(string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load flowname from arguments %v", args[0]))
return fmt.Errorf("failed to load flowname from arguments %v", args[0])
}
request.FlowName = flowName
}

request := &runtime.Request{}
err := json.Unmarshal([]byte(reqData), request)
if err != nil {
fRuntime.Logger.Log(fmt.Sprintf("failed to unmarshal request, error %v", err))
return fmt.Errorf("failed to unmarshal request, error %v", err)
if args[1] != nil {
requestId, ok := args[1].(string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load requestId from arguments %v", args[1]))
return fmt.Errorf("failed to load requestId from arguments %v", args[1])
}
request.RequestID = requestId
}

if args[2] != nil {
body, ok := args[2].(string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load body from arguments %v", args[2]))
return fmt.Errorf("failed to load body from arguments %v", args[2])
}
request.Body = []byte(body)
}

if args[3] != nil {
header, ok := args[3].(map[string][]string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load header from arguments %v", args[3]))
return fmt.Errorf("failed to load header from arguments %v", args[3])
}
request.Header = header
} else {
request.Header = make(map[string][]string)
}

if args[4] != nil {
rawQuery, ok := args[4].(string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load raw-query from arguments %v", args[4]))
return fmt.Errorf("failed to load raw-query from arguments %v", args[4])
}
request.RawQuery = rawQuery
}

if args[5] != nil {
query, ok := args[5].(map[string][]string)
if !ok {
fRuntime.Logger.Log(fmt.Sprintf("failed to load query from arguments %v", args[5]))
return fmt.Errorf("failed to load query from arguments %v", args[5])
}
request.Query = query
} else {
request.Query = make(map[string][]string)
}

executor, err := fRuntime.CreateExecutor(request)
if err != nil {
fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to execute request, error: %v", request.RequestID, err))
fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to execute request, error: %v", request.RequestID, err))
return fmt.Errorf("failed to execute request " + request.RequestID + ", error: " + err.Error())
}
response := &runtime.Response{}
Expand All @@ -149,13 +215,13 @@ func (fRuntime *FlowRuntime) queueReceiver(queue string, args ...interface{}) er

err = handler.PartialExecuteFlowHandler(response, request, executor)
if err != nil {
fRuntime.Logger.Log(fmt.Sprint("request failed to be processed. error: " + err.Error()))
fRuntime.Logger.Log(fmt.Sprintf("[Request `%s`] failed to be processed. error: %v", request.RequestID, err.Error()))
return fmt.Errorf("request failed to be processed. error: " + err.Error())
}

return nil
}

func (fRuntime *FlowRuntime) queueId() string {
return fmt.Sprint("%s_%s", PartialRequestQueue, fRuntime.FlowName)
}
return fmt.Sprintf("%s_%s", PartialRequestQueue, fRuntime.FlowName)
}
2 changes: 1 addition & 1 deletion runtime/new_request_handler_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ func getFlowName(runtime runtimepkg.Runtime) string {
return ""
}
return fr.FlowName
}
}

0 comments on commit 7b05e96

Please sign in to comment.