From 9602990b85a68ba9dfcb9d702072fc9475f3ff9d Mon Sep 17 00:00:00 2001 From: richbl Date: Mon, 23 Dec 2024 20:24:49 -0800 Subject: [PATCH] refactor(APP): added new service_runner in main package to better manage signals and contexts Signed-off-by: richbl --- cmd/main.go | 310 +++++++++--------------------- cmd/service_runner.go | 168 ++++++++++++++++ internal/ble/sensor_controller.go | 10 +- 3 files changed, 269 insertions(+), 219 deletions(-) create mode 100644 cmd/service_runner.go diff --git a/cmd/main.go b/cmd/main.go index 6562efe..b934d51 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,9 +6,7 @@ import ( "log" "os" "os/exec" - "os/signal" - "sync" - "syscall" + "time" ble "github.com/richbl/go-ble-sync-cycle/internal/ble" config "github.com/richbl/go-ble-sync-cycle/internal/configuration" @@ -19,160 +17,96 @@ import ( "tinygo.org/x/bluetooth" ) -// appControllers holds the main application controllers +// Application constants +const ( + appPrefix = "----- -----" + appName = "BLE Sync Cycle" + appVersion = "0.6.2" + shutdownTimeout = 30 * time.Second +) + +// appControllers holds the application component controllers for managing speed, video playback, +// and BLE communication type appControllers struct { speedController *speed.SpeedController videoPlayer *video.PlaybackController bleController *ble.BLEController } -// shutdownHandler encapsulates shutdown coordination -type shutdownHandler struct { - done chan struct{} - componentsDown chan struct{} - cleanupOnce sync.Once - wg *sync.WaitGroup - rootCancel context.CancelFunc - resetTerminal func() -} - -// componentErr holds the error type and component type used for logging -type componentErr struct { - componentType logger.ComponentType - err error -} - -const ( - appPrefix = "----- -----" - appName = "BLE Sync Cycle" - appVersion = "0.6.2" -) - func main() { - // Hello computer! log.Println(appPrefix, "Starting", appName, appVersion) - // Load configuration from TOML cfg := loadConfig("config.toml") - // Initialize logger package with display level from TOML configuration - logger.Initialize(cfg.App.LogLevel) + // Initialize the shutdown manager and exit handler + sm := NewShutdownManager(shutdownTimeout) + exitHandler := NewExitHandler(sm) - // Initialize shutdown services: signal handling, context cancellation and terminal config - var wg sync.WaitGroup - rootCtx, rootCancel := context.WithCancel(context.Background()) - sh := &shutdownHandler{ - done: make(chan struct{}), - componentsDown: make(chan struct{}), - wg: &wg, - rootCancel: rootCancel, - resetTerminal: configTerminal(), - } - defer rootCancel() + // Add configureTerminal cleanup function to reset terminal settings on exit + sm.AddCleanupFn(configureTerminal()) + sm.Start() - // Set up shutdown handlers - setupSignalHandling(sh) - logger.SetExitHandler(sh.cleanup) + // Initialize the logger with the configured log level and exit handler + logger.Initialize(cfg.App.LogLevel) + logger.SetExitHandler(func() { + sm.initiateShutdown() + exitHandler.HandleExit() + }) - // Create component controllers + // Initialize the application controllers controllers, componentType, err := setupAppControllers(*cfg) if err != nil { logger.Fatal(componentType, "failed to create controllers: "+err.Error()) - <-sh.done - waveGoodbye() - } - - // Start components - if componentType, err := startAppControllers(rootCtx, controllers, sh.wg); err != nil { - logger.Fatal(componentType, err.Error()) - <-sh.done - waveGoodbye() + return } - <-sh.done -} - -// loadConfig loads the TOML configuration file -func loadConfig(file string) *config.Config { - cfg, err := config.LoadFile(file) + // Scan for the BLE characteristic and handle context cancellation + bleChar, err := scanForBLECharacteristic(sm.Context(), controllers) if err != nil { - log.Println(logger.Red + "[FTL]" + logger.Reset + " [APP] failed to load TOML configuration: " + err.Error()) - waveGoodbye() - } - - return cfg -} -// cleanup handles graceful shutdown of all components -func (sh *shutdownHandler) cleanup() { + if err != context.Canceled { + logger.Fatal(logger.BLE, "failed to scan for BLE characteristic: "+err.Error()) + return + } - sh.cleanupOnce.Do(func() { - // Signal components to shut down and wait for them to finish - sh.rootCancel() - sh.wg.Wait() - close(sh.componentsDown) + exitHandler.HandleExit() + return + } - // Perform final cleanup - sh.resetTerminal() - close(sh.done) - waveGoodbye() + // Create and run the BLE service runner + bleRunner := NewServiceRunner(sm, "BLE") + bleRunner.Run(func(ctx context.Context) error { + return controllers.bleController.GetBLEUpdates(ctx, controllers.speedController, bleChar) }) -} - -// waveGoodbye outputs a goodbye message and exits the application -func waveGoodbye() { - log.Println(appPrefix, appName, appVersion, "shutdown complete. Goodbye!") - os.Exit(0) -} - -// setupSignalHandling configures OS signal handling -func setupSignalHandling(sh *shutdownHandler) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - - go func() { - <-sigChan - sh.cleanup() - }() - -} - -// configTerminal handles terminal char echo to prevent display of break (^C) character -func configTerminal() func() { - // Disable control character echo using stty - rawMode := exec.Command("stty", "-echo") - rawMode.Stdin = os.Stdin - - if err := rawMode.Run(); err != nil { - logger.Fatal(logger.APP, "failed to configure terminal: "+err.Error()) - waveGoodbye() - } - - // Return cleanup function - return func() { - cooked := exec.Command("stty", "echo") - cooked.Stdin = os.Stdin + // Create and run the video service runner + videoRunner := NewServiceRunner(sm, "Video") + videoRunner.Run(func(ctx context.Context) error { + return controllers.videoPlayer.Start(ctx, controllers.speedController) + }) - if err := cooked.Run(); err != nil { - logger.Fatal(logger.APP, "failed to restore terminal: "+err.Error()) - waveGoodbye() + // Wait for services to complete and check for errors + for _, runner := range []*ServiceRunner{bleRunner, videoRunner} { + if err := runner.Error(); err != nil { + logger.Fatal(logger.APP, "service error: "+err.Error()) + return } - } + // Wait for final shutdown sequences to complete and wave goodbye! + sm.Wait() + waveGoodbye() } -// setupAppControllers creates and initializes the application controllers +// setupAppControllers creates and initializes all application controllers func setupAppControllers(cfg config.Config) (appControllers, logger.ComponentType, error) { - // Create speed and video controllers + speedController := speed.NewSpeedController(cfg.Speed.SmoothingWindow) videoPlayer, err := video.NewPlaybackController(cfg.Video, cfg.Speed) if err != nil { return appControllers{}, logger.VIDEO, errors.New("failed to create video player: " + err.Error()) } - // Create BLE controller bleController, err := ble.NewBLEController(cfg.BLE, cfg.Speed) if err != nil { return appControllers{}, logger.BLE, errors.New("failed to create BLE controller: " + err.Error()) @@ -185,121 +119,63 @@ func setupAppControllers(cfg config.Config) (appControllers, logger.ComponentTyp }, logger.APP, nil } -// startAppControllers is responsible for starting and managing the component controllers -func startAppControllers(ctx context.Context, controllers appControllers, wg *sync.WaitGroup) (logger.ComponentType, error) { - // Create shutdown signal context. - ctxWithCancel, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) - defer stop() - - // Scan to find the speed characteristic - bleSpeedCharacter, err := runBLEScan(ctxWithCancel, controllers) - if err != nil { - - // Check if the context was cancelled (user pressed Ctrl+C) - if errors.Is(err, context.Canceled) { - return logger.APP, nil - } - - return logger.BLE, errors.New("BLE peripheral scan failed: " + err.Error()) - } - - errChan := make(chan componentErr, 2) // Buffered channel for component errors - wg.Add(2) - - // Start BLE monitoring and video playback - startBLEMonitoring(ctxWithCancel, controllers, wg, bleSpeedCharacter, errChan) - startVideoPlaying(ctxWithCancel, controllers, wg, errChan) - - // Wait for component results or cancellation - for i := 0; i < 2; i++ { - - select { - case compErr := <-errChan: +// scanForBLECharacteristic handles the initial BLE device discovery and characteristic scanning +// using a context for cancellation and returns the discovered characteristic or an error +func scanForBLECharacteristic(ctx context.Context, controllers appControllers) (*bluetooth.DeviceCharacteristic, error) { - if compErr.err != nil { - return compErr.componentType, compErr.err - } - - // Context cancelled, no error - case <-ctxWithCancel.Done(): - return logger.APP, nil - } - - } - - return logger.APP, nil -} - -// runBLEScan scans for the BLE speed characteristic. -func runBLEScan(ctx context.Context, controllers appControllers) (*bluetooth.DeviceCharacteristic, error) { - results := make(chan *bluetooth.DeviceCharacteristic, 1) - errChan := make(chan error, 1) + // Create a channel to receive the result of the BLE characteristic scan + resultsChan := make(chan struct { + char *bluetooth.DeviceCharacteristic + err error + }, 1) go func() { - characteristic, err := controllers.bleController.GetBLECharacteristic(ctx, controllers.speedController) - if err != nil { - errChan <- err - return - } - - results <- characteristic + defer close(resultsChan) + char, err := controllers.bleController.GetBLECharacteristic(ctx, controllers.speedController) + resultsChan <- struct { + char *bluetooth.DeviceCharacteristic + err error + }{char, err} }() select { case <-ctx.Done(): - logger.Info(logger.BLE, "user-generated interrupt, stopping BLE characteristic scan...") + logger.Info(logger.BLE, "user-generated interrupt, stopping BLE discovery...") return nil, ctx.Err() - case err := <-errChan: - return nil, err - case characteristic := <-results: - return characteristic, nil + case result := <-resultsChan: + return result.char, result.err } } -// startBLEMonitoring starts the BLE monitoring goroutine -func startBLEMonitoring(ctx context.Context, controllers appControllers, wg *sync.WaitGroup, bleSpeedCharacter *bluetooth.DeviceCharacteristic, errChan chan<- componentErr) { - go func() { - defer wg.Done() - - if err := monitorBLESpeed(ctx, controllers, bleSpeedCharacter); err != nil { - - // Only send error if context was not cancelled - if !errors.Is(err, context.Canceled) { - errChan <- componentErr{componentType: logger.BLE, err: err} - } +// loadConfig loads and validates the TOML configuration file +func loadConfig(file string) *config.Config { - return - } + cfg, err := config.LoadFile(file) + if err != nil { + log.Println(logger.Red + "[FTL]" + logger.Reset + " [APP] failed to load TOML configuration: " + err.Error()) + waveGoodbye() + } - errChan <- componentErr{componentType: logger.BLE, err: nil} - }() + return cfg } -// startVideoPlaying starts the video playing goroutine. -func startVideoPlaying(ctx context.Context, controllers appControllers, wg *sync.WaitGroup, errChan chan<- componentErr) { - go func() { - defer wg.Done() +// configureTerminal handles terminal character echo settings, returning a cleanup function +// to restore original terminal settings +func configureTerminal() func() { - if err := playVideo(ctx, controllers); err != nil { - - // Only send error if context was not cancelled - if !errors.Is(err, context.Canceled) { - errChan <- componentErr{componentType: logger.VIDEO, err: err} - } - - return - } - - errChan <- componentErr{componentType: logger.VIDEO, err: nil} - }() -} + rawMode := exec.Command("stty", "-echo") + rawMode.Stdin = os.Stdin + _ = rawMode.Run() -// monitorBLESpeed monitors the BLE speed characteristic -func monitorBLESpeed(ctx context.Context, controllers appControllers, bleSpeedCharacter *bluetooth.DeviceCharacteristic) error { - return controllers.bleController.GetBLEUpdates(ctx, controllers.speedController, bleSpeedCharacter) + return func() { + cooked := exec.Command("stty", "echo") + cooked.Stdin = os.Stdin + _ = cooked.Run() + } } -// playVideo starts the video player. -func playVideo(ctx context.Context, controllers appControllers) error { - return controllers.videoPlayer.Start(ctx, controllers.speedController) +// waveGoodbye outputs a goodbye message and exits the program +func waveGoodbye() { + log.Println(appPrefix, appName, appVersion, "shutdown complete. Goodbye!") + os.Exit(0) } diff --git a/cmd/service_runner.go b/cmd/service_runner.go new file mode 100644 index 0000000..4147a1e --- /dev/null +++ b/cmd/service_runner.go @@ -0,0 +1,168 @@ +package main + +import ( + "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + + logger "github.com/richbl/go-ble-sync-cycle/internal/logging" +) + +// ServiceRunner manages individual service goroutines and their lifecycle +type ServiceRunner struct { + sm *ShutdownManager + serviceName string + errChan chan error +} + +// ShutdownManager handles graceful shutdown of application components and coordinates cleanup +// operations with context cancellations and timeout management +type ShutdownManager struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + cleanupFuncs []func() + timeout time.Duration + terminated chan struct{} + cleanupOnce sync.Once +} + +// ExitHandler coordinates the final application shutdown sequence +type ExitHandler struct { + sm *ShutdownManager +} + +// NewShutdownManager creates a new ShutdownManager with the specified timeout duration +func NewShutdownManager(timeout time.Duration) *ShutdownManager { + + ctx, cancel := context.WithCancel(context.Background()) + + return &ShutdownManager{ + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + terminated: make(chan struct{}), + timeout: timeout, + } +} + +// Context returns the ShutdownManager's context for cancellation propagation +func (sm *ShutdownManager) Context() context.Context { + return sm.ctx +} + +// Wait blocks until the shutdown sequence is complete +func (sm *ShutdownManager) Wait() { + <-sm.terminated +} + +// WaitGroup returns the ShutdownManager's WaitGroup for goroutine synchronization +func (sm *ShutdownManager) WaitGroup() *sync.WaitGroup { + return &sm.wg +} + +// AddCleanupFn adds a cleanup function to be executed during shutdown +// Note that cleanup functions are executed in reverse order of registration +func (sm *ShutdownManager) AddCleanupFn(fn func()) { + sm.cleanupFuncs = append(sm.cleanupFuncs, fn) +} + +// Start begins listening for shutdown signals (SIGINT, SIGTERM) +// When a signal is received, it initiates the shutdown sequence +func (sm *ShutdownManager) Start() { + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigChan + sm.initiateShutdown() + }() +} + +// initiateShutdown coordinates the shutdown sequence, including timeout management and cleanup +// function execution, and ensures the shutdown sequence runs only once +func (sm *ShutdownManager) initiateShutdown() { + + sm.cleanupOnce.Do(func() { + sm.cancel() + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), sm.timeout) + defer timeoutCancel() + done := make(chan struct{}) + + go func() { + sm.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-timeoutCtx.Done(): + logger.Warn(logger.APP, "shutdown timed out, some goroutines may not have cleaned up properly") + } + + // Execute cleanup functions (reverse order) + for i := len(sm.cleanupFuncs) - 1; i >= 0; i-- { + sm.cleanupFuncs[i]() + } + + close(sm.terminated) + }) +} + +// NewServiceRunner creates a new ServiceRunner with the specified name and ShutdownManager +func NewServiceRunner(sm *ShutdownManager, name string) *ServiceRunner { + + return &ServiceRunner{ + sm: sm, + serviceName: name, + errChan: make(chan error, 1), + } +} + +// Run executes the provided function in a goroutine and manages its lifecycle, automatically +// handling cleanup and error propagation +func (sr *ServiceRunner) Run(fn func(context.Context) error) { + + sr.sm.wg.Add(1) + + go func() { + defer sr.sm.wg.Done() + + if err := fn(sr.sm.ctx); err != nil && err != context.Canceled { + sr.errChan <- err + sr.sm.cancel() + } + + close(sr.errChan) + }() +} + +// Error returns any error that occurred during service execution +func (sr *ServiceRunner) Error() error { + + select { + case err := <-sr.errChan: + return err + default: + return nil + } +} + +// NewExitHandler creates a new ExitHandler with the specified shutdown manager +func NewExitHandler(sm *ShutdownManager) *ExitHandler { + return &ExitHandler{sm: sm} +} + +// HandleExit coordinates the final shutdown sequence and exits the application +func (h *ExitHandler) HandleExit() { + + if h.sm != nil { + h.sm.Wait() + } + + waveGoodbye() +} diff --git a/internal/ble/sensor_controller.go b/internal/ble/sensor_controller.go index fb5b8b0..09edc30 100644 --- a/internal/ble/sensor_controller.go +++ b/internal/ble/sensor_controller.go @@ -44,6 +44,7 @@ var ( // NewBLEController creates a new BLE central controller for accessing a BLE peripheral func NewBLEController(bleConfig config.BLEConfig, speedConfig config.SpeedConfig) (*BLEController, error) { + // Enable BLE adapter bleAdapter := bluetooth.DefaultAdapter @@ -62,6 +63,7 @@ func NewBLEController(bleConfig config.BLEConfig, speedConfig config.SpeedConfig // ScanForBLEPeripheral scans for a BLE peripheral with the specified UUID func (m *BLEController) ScanForBLEPeripheral(ctx context.Context) (bluetooth.ScanResult, error) { + // Create context with timeout scanCtx, cancel := context.WithTimeout(ctx, time.Duration(m.bleConfig.ScanTimeoutSecs)*time.Second) defer cancel() @@ -86,7 +88,6 @@ func (m *BLEController) ScanForBLEPeripheral(ctx context.Context) (bluetooth.Sca case err := <-errChan: return bluetooth.ScanResult{}, err case <-scanCtx.Done(): - logger.Info(logger.BLE, "user-generated interrupt, stopping BLE component scan...") if err := m.bleAdapter.StopScan(); err != nil { logger.Error(logger.BLE, "failed to stop scan: "+err.Error()) @@ -99,6 +100,7 @@ func (m *BLEController) ScanForBLEPeripheral(ctx context.Context) (bluetooth.Sca // startScanning starts the BLE scan and sends the result to the found channel when the target device is discovered func (m *BLEController) startScanning(found chan<- bluetooth.ScanResult) error { + // Start BLE scan err := m.bleAdapter.Scan(func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) { @@ -125,6 +127,7 @@ func (m *BLEController) startScanning(found chan<- bluetooth.ScanResult) error { // GetBLECharacteristic scans for the BLE peripheral and returns CSC services/characteristics func (m *BLEController) GetBLECharacteristic(ctx context.Context, speedController *speed.SpeedController) (*bluetooth.DeviceCharacteristic, error) { + // Scan for BLE peripheral result, err := m.ScanForBLEPeripheral(ctx) if err != nil { @@ -135,7 +138,6 @@ func (m *BLEController) GetBLECharacteristic(ctx context.Context, speedControlle // Connect to BLE peripheral device var device bluetooth.Device - if device, err = m.bleAdapter.Connect(result.Address, bluetooth.ConnectionParams{}); err != nil { return nil, err } @@ -165,6 +167,7 @@ func (m *BLEController) GetBLECharacteristic(ctx context.Context, speedControlle // GetBLEUpdates enables BLE peripheral monitoring to report real-time sensor data func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *speed.SpeedController, char *bluetooth.DeviceCharacteristic) error { + logger.Debug(logger.BLE, "starting real-time monitoring of BLE sensor notifications...") errChan := make(chan error, 1) @@ -197,6 +200,7 @@ func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *spee // ProcessBLESpeed processes the raw speed data from the BLE peripheral func (m *BLEController) ProcessBLESpeed(data []byte) float64 { + // Parse speed data newSpeedData, err := m.parseSpeedData(data) if err != nil { @@ -213,6 +217,7 @@ func (m *BLEController) ProcessBLESpeed(data []byte) float64 { // calculateSpeed calculates the current speed based on the sensor data func (m *BLEController) calculateSpeed(sm SpeedMeasurement) float64 { + // First time through the loop set the last wheel revs and time if lastWheelTime == 0 { lastWheelRevs = sm.wheelRevs @@ -247,6 +252,7 @@ func (m *BLEController) calculateSpeed(sm SpeedMeasurement) float64 { // parseSpeedData parses the raw speed data from the BLE peripheral func (m *BLEController) parseSpeedData(data []byte) (SpeedMeasurement, error) { + // Check for data if len(data) < 1 { return SpeedMeasurement{}, errors.New("empty data")