From 8c35212b4ffc751e6fc8f37ce08049e8db117ddd Mon Sep 17 00:00:00 2001 From: danli001 <86958145+danli001@users.noreply.github.com> Date: Tue, 11 Jul 2023 10:48:22 +0800 Subject: [PATCH] enable Register() to support dynamically registering flow --- runtime/flow_runtime.go | 167 ++++++++++++++++++++++++++++------------ v1/goflow.go | 155 +++++++++++++++++++------------------ 2 files changed, 199 insertions(+), 123 deletions(-) diff --git a/runtime/flow_runtime.go b/runtime/flow_runtime.go index d4055af..82989a8 100644 --- a/runtime/flow_runtime.go +++ b/runtime/flow_runtime.go @@ -40,6 +40,7 @@ type FlowRuntime struct { EnableMonitoring bool RetryQueueCount int DebugEnabled bool + workerMode bool eventHandler sdk.EventHandler @@ -101,6 +102,11 @@ func (fRuntime *FlowRuntime) Init() error { } } + fRuntime.rmqConnection, err = OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + if err != nil { + return fmt.Errorf("failed to initiate rmq connection, error %v", err) + } + if fRuntime.Logger == nil { fRuntime.Logger = &log2.StdErrLogger{} } @@ -133,18 +139,24 @@ func (fRuntime *FlowRuntime) CreateExecutor(req *runtime.Request) (executor.Exec return ex, err } -// AppendFlows appends flows to the runtime -// If the queue worker not started or the flow is already registered, it returns an error -func (fRuntime *FlowRuntime) AppendFlows(flows map[string]FlowDefinitionHandler) error { - +// Register flows to the runtime +// If the flow is already registered, it returns an error +func (fRuntime *FlowRuntime) Register(flows map[string]FlowDefinitionHandler) error { if reflect.ValueOf(fRuntime.rmqConnection).IsNil() { - return fmt.Errorf("unable to append flows, queue worker not started") + return fmt.Errorf("unable to register flows, rmq connection not initialized") + } + + if len(flows) == 0 { + return nil } + var flowNames []string for flowName := range flows { if _, ok := fRuntime.Flows[flowName]; ok { return fmt.Errorf("flow %s already registered", flowName) } + + flowNames = append(flowNames, flowName) } // register flows to runtime @@ -152,9 +164,54 @@ func (fRuntime *FlowRuntime) AppendFlows(flows map[string]FlowDefinitionHandler) fRuntime.Flows[flowName] = flowHandler } - err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, flows) + // initialize task queues when in worker mode + if fRuntime.workerMode { + err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, flows) + if err != nil { + return fmt.Errorf(fmt.Sprintf("failed to initialize task queues for flows %v, error %v", flowNames, err)) + } + } + + fRuntime.Logger.Log(fmt.Sprintf("[goflow] queue workers for flows %v started successfully", flowNames)) + + return nil +} + +// EnterWorkerMode put the runtime into worker mode +func (fRuntime *FlowRuntime) EnterWorkerMode() error { + if reflect.ValueOf(fRuntime.rmqConnection).IsNil() { + return fmt.Errorf("unable to enter worker mode, rmq connection not initialized") + } + + if fRuntime.workerMode { + // already in worker mode + return nil + } + fRuntime.workerMode = true + + err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, fRuntime.Flows) + if err != nil { + return fmt.Errorf("failed to enter worker mode, error: " + err.Error()) + } + + return nil +} + +// ExitWorkerMode take the runtime out of worker mode +func (fRuntime *FlowRuntime) ExitWorkerMode() error { + if reflect.ValueOf(fRuntime.rmqConnection).IsNil() { + return nil + } + + if !fRuntime.workerMode { + // already not in worker mode + return nil + } + fRuntime.workerMode = false + + err := fRuntime.cleanTaskQueues() if err != nil { - return fmt.Errorf("failed to initialize task queues, error %v", err) + return fmt.Errorf("failed to exit worker mode, error: " + err.Error()) } return nil @@ -289,60 +346,55 @@ func (fRuntime *FlowRuntime) StopServer() error { return nil } -// StartQueueWorker starts listening for request in queue -func (fRuntime *FlowRuntime) StartQueueWorker(errorChan chan error) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) - if err != nil { - return fmt.Errorf("failed to initiate connection, error %v", err) - } - fRuntime.rmqConnection = connection - - err = fRuntime.initializeTaskQueues(&connection, fRuntime.Flows) - if err != nil { - return fmt.Errorf("failed to initiate task queues, error %v", err) - } - - fRuntime.Logger.Log("[goflow] queue worker started successfully") - - err = <-errorChan - <-fRuntime.rmqConnection.StopAllConsuming() - return err -} - // StartRuntime starts the runtime func (fRuntime *FlowRuntime) StartRuntime() error { worker := &Worker{ ID: getNewId(), - Flows: make([]string, 0, len(fRuntime.Flows)), Concurrency: fRuntime.Concurrency, } - // Get the flow details for each flow - flowDetails := make(map[string]string) - for flowID, defHandler := range fRuntime.Flows { - worker.Flows = append(worker.Flows, flowID) - dag, err := getFlowDefinition(defHandler) + + registerDetails := func() error { + // Get the flow details for each flow + flowDetails := make(map[string]string) + for flowID, defHandler := range fRuntime.Flows { + worker.Flows = append(worker.Flows, flowID) + dag, err := getFlowDefinition(defHandler) + if err != nil { + return fmt.Errorf("failed to start runtime, dag export failed, error %v", err) + } + flowDetails[flowID] = dag + } + + if fRuntime.workerMode { + err := fRuntime.saveWorkerDetails(worker) + if err != nil { + return fmt.Errorf("failed to register worker details, %v", err) + } + } else { + err := fRuntime.deleteWorkerDetails(worker) + if err != nil { + return fmt.Errorf("failed to deregister worker details, %v", err) + } + } + + err := fRuntime.saveFlowDetails(flowDetails) if err != nil { - return fmt.Errorf("failed to start runtime, dag export failed, error %v", err) + return fmt.Errorf("failed to register flow details, %v", err) } - flowDetails[flowID] = dag - } - err := fRuntime.saveWorkerDetails(worker) - if err != nil { - return fmt.Errorf("failed to register worker details, %v", err) + + return nil } - err = fRuntime.saveFlowDetails(flowDetails) + + err := registerDetails() if err != nil { - return fmt.Errorf("failed to register worker details, %v", err) + log.Printf("failed to register details, %v", err) + return err } + err = gocron.Every(GoFlowRegisterInterval).Second().Do(func() { - var err error - err = fRuntime.saveWorkerDetails(worker) + err := registerDetails() if err != nil { - log.Printf("failed to register worker details, %v", err) - } - err = fRuntime.saveFlowDetails(flowDetails) - if err != nil { - log.Printf("failed to register worker details, %v", err) + log.Printf("failed to register details, %v", err) } }) if err != nil { @@ -563,6 +615,18 @@ func (fRuntime *FlowRuntime) initializeTaskQueues(connection *rmq.Connection, fl return nil } +func (fRuntime *FlowRuntime) cleanTaskQueues() error { + + if !reflect.ValueOf(fRuntime.rmqConnection).IsNil() { + endChan := fRuntime.rmqConnection.StopAllConsuming() + <-endChan + } + + fRuntime.taskQueues = map[string]rmq.Queue{} + + return nil +} + func (fRuntime *FlowRuntime) internalRequestQueueId(flowName string) string { return fmt.Sprintf("%s:%s", InternalRequestQueueInitial, flowName) } @@ -579,6 +643,13 @@ func (fRuntime *FlowRuntime) saveWorkerDetails(worker *Worker) error { return nil } +func (fRuntime *FlowRuntime) deleteWorkerDetails(worker *Worker) error { + rdb := fRuntime.rdb + key := fmt.Sprintf("%s:%s", WorkerKeyInitial, worker.ID) + rdb.Del(key) + return nil +} + func (fRuntime *FlowRuntime) saveFlowDetails(flows map[string]string) error { rdb := fRuntime.rdb for flowId, definition := range flows { diff --git a/v1/goflow.go b/v1/goflow.go index 1e1bdae..8031fe2 100644 --- a/v1/goflow.go +++ b/v1/goflow.go @@ -178,81 +178,55 @@ func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionH fs.Flows[flowName] = handler - return nil -} - -// AppendFlows registers flows dynamically after the service has started -func (fs *FlowService) AppendFlows(flows map[string]runtime.FlowDefinitionHandler) error { - if fs.runtime == nil { - return fmt.Errorf("flow service has not been started yet") + errorChan := make(chan error) + if err := fs.initRuntime(errorChan); err != nil { + return err } + go func() { + err := <-errorChan + close(errorChan) + fs.Logger.Log("runtime has stopped, error: " + err.Error()) + }() - err := fs.runtime.AppendFlows(flows) + err := fs.runtime.Register(map[string]runtime.FlowDefinitionHandler{flowName: handler}) if err != nil { - return fmt.Errorf("failed to append flows: %s", err) + return err } return nil } func (fs *FlowService) Start() error { - if len(fs.Flows) == 0 { - return fmt.Errorf("must register atleast one flow") - } fs.ConfigureDefault() - fs.runtime = &runtime.FlowRuntime{ - Flows: fs.Flows, - OpenTracingUrl: fs.OpenTraceUrl, - RedisURL: fs.RedisURL, - RedisPassword: fs.RedisPassword, - DataStore: fs.DataStore, - Logger: fs.Logger, - ServerPort: fs.Port, - ReadTimeout: fs.RequestReadTimeout, - WriteTimeout: fs.RequestWriteTimeout, - Concurrency: fs.WorkerConcurrency, - RequestAuthSharedSecret: fs.RequestAuthSharedSecret, - RequestAuthEnabled: fs.RequestAuthEnabled, - EnableMonitoring: fs.EnableMonitoring, - RetryQueueCount: fs.RetryCount, - DebugEnabled: fs.DebugEnabled, - } + errorChan := make(chan error) defer close(errorChan) - if err := fs.initRuntime(); err != nil { + + if err := fs.initRuntime(errorChan); err != nil { return err } - go fs.runtimeWorker(errorChan) - go fs.queueWorker(errorChan) + if err := fs.setWorkerMode(true); err != nil { + return err + } + go fs.server(errorChan) err := <-errorChan - return err + return fmt.Errorf("server has stopped, error: %v", err) } func (fs *FlowService) StartServer() error { fs.ConfigureDefault() - fs.runtime = &runtime.FlowRuntime{ - Flows: fs.Flows, - OpenTracingUrl: fs.OpenTraceUrl, - RedisURL: fs.RedisURL, - RedisPassword: fs.RedisPassword, - DataStore: fs.DataStore, - Logger: fs.Logger, - ServerPort: fs.Port, - ReadTimeout: fs.RequestReadTimeout, - WriteTimeout: fs.RequestWriteTimeout, - RequestAuthSharedSecret: fs.RequestAuthSharedSecret, - RequestAuthEnabled: fs.RequestAuthEnabled, - EnableMonitoring: fs.EnableMonitoring, - RetryQueueCount: fs.RetryCount, - DebugEnabled: fs.DebugEnabled, - } + errorChan := make(chan error) defer close(errorChan) - if err := fs.initRuntime(); err != nil { + if err := fs.initRuntime(errorChan); err != nil { return err } - go fs.runtimeWorker(errorChan) + + if err := fs.setWorkerMode(false); err != nil { + return err + } + go fs.server(errorChan) err := <-errorChan return fmt.Errorf("server has stopped, error: %v", err) @@ -260,27 +234,18 @@ func (fs *FlowService) StartServer() error { func (fs *FlowService) StartWorker() error { fs.ConfigureDefault() - fs.runtime = &runtime.FlowRuntime{ - Flows: fs.Flows, - OpenTracingUrl: fs.OpenTraceUrl, - RedisURL: fs.RedisURL, - RedisPassword: fs.RedisPassword, - DataStore: fs.DataStore, - Logger: fs.Logger, - Concurrency: fs.WorkerConcurrency, - RequestAuthSharedSecret: fs.RequestAuthSharedSecret, - RequestAuthEnabled: fs.RequestAuthEnabled, - EnableMonitoring: fs.EnableMonitoring, - RetryQueueCount: fs.RetryCount, - DebugEnabled: fs.DebugEnabled, - } + errorChan := make(chan error) defer close(errorChan) - if err := fs.initRuntime(); err != nil { + + if err := fs.initRuntime(errorChan); err != nil { + return err + } + if err := fs.setWorkerMode(true); err != nil { return err } + go fs.runtimeWorker(errorChan) - go fs.queueWorker(errorChan) err := <-errorChan return fmt.Errorf("worker has stopped, error: %v", err) } @@ -306,11 +271,56 @@ func (fs *FlowService) ConfigureDefault() { } } -func (fs *FlowService) initRuntime() error { - err := fs.runtime.Init() - if err != nil { +func (fs *FlowService) initRuntime(errorChan chan error) error { + + // runtime has already been initialized + if fs.runtime != nil { + return nil + } + + fs.runtime = &runtime.FlowRuntime{ + Flows: map[string]runtime.FlowDefinitionHandler{}, + OpenTracingUrl: fs.OpenTraceUrl, + RedisURL: fs.RedisURL, + RedisPassword: fs.RedisPassword, + DataStore: fs.DataStore, + Logger: fs.Logger, + ServerPort: fs.Port, + ReadTimeout: fs.RequestReadTimeout, + WriteTimeout: fs.RequestWriteTimeout, + Concurrency: fs.WorkerConcurrency, + RequestAuthSharedSecret: fs.RequestAuthSharedSecret, + RequestAuthEnabled: fs.RequestAuthEnabled, + EnableMonitoring: fs.EnableMonitoring, + RetryQueueCount: fs.RetryCount, + DebugEnabled: fs.DebugEnabled, + } + + if err := fs.runtime.Init(); err != nil { return err } + go fs.runtimeWorker(errorChan) + + return nil +} + +func (fs *FlowService) setWorkerMode(workerMode bool) error { + if fs.runtime == nil { + return fmt.Errorf("runtime is not initialized") + } + + if workerMode { + err := fs.runtime.EnterWorkerMode() + if err != nil { + return err + } + } else { + err := fs.runtime.ExitWorkerMode() + if err != nil { + return err + } + } + return nil } @@ -319,11 +329,6 @@ func (fs *FlowService) runtimeWorker(errorChan chan error) { errorChan <- fmt.Errorf("runtime has stopped, error: %v", err) } -func (fs *FlowService) queueWorker(errorChan chan error) { - err := fs.runtime.StartQueueWorker(errorChan) - errorChan <- fmt.Errorf("worker has stopped, error: %v", err) -} - func (fs *FlowService) server(errorChan chan error) { err := fs.runtime.StartServer() errorChan <- fmt.Errorf("server has stopped, error: %v", err)