From 27c043dde16cbbbf80b98348996d8ac4b33e8c47 Mon Sep 17 00:00:00 2001 From: richbl Date: Tue, 24 Dec 2024 19:28:54 -0800 Subject: [PATCH 1/2] refactor(APP): :recycle: Simplifying logic in the video and BLE components Signed-off-by: richbl --- internal/ble/sensor_controller.go | 29 +-- internal/video-player/playback_controller.go | 226 ++++++++---------- .../video-player/playback_controller_test.go | 3 +- 3 files changed, 112 insertions(+), 146 deletions(-) diff --git a/internal/ble/sensor_controller.go b/internal/ble/sensor_controller.go index 18cc731..c335628 100644 --- a/internal/ble/sensor_controller.go +++ b/internal/ble/sensor_controller.go @@ -1,4 +1,3 @@ -// Package ble provides Bluetooth Low Energy (BLE) functionality for cycling speed sensors package ble import ( @@ -24,12 +23,6 @@ const ( mphConversion = 2.23694 // Conversion factor for miles per hour ) -// Package-level variables for tracking speed measurements -var ( - lastWheelRevs uint32 - lastWheelTime uint16 -) - // SpeedMeasurement represents the wheel revolution and time data from a BLE sensor type SpeedMeasurement struct { wheelRevs uint32 @@ -38,9 +31,11 @@ type SpeedMeasurement struct { // BLEController manages BLE communication with cycling speed sensors type BLEController struct { - bleConfig config.BLEConfig - speedConfig config.SpeedConfig - bleAdapter bluetooth.Adapter + bleConfig config.BLEConfig + speedConfig config.SpeedConfig + bleAdapter bluetooth.Adapter + lastWheelRevs uint32 + lastWheelTime uint16 } // NewBLEController creates a new BLE central controller for accessing a BLE peripheral @@ -206,26 +201,26 @@ func (m *BLEController) startScanning(found chan<- bluetooth.ScanResult) error { func (m *BLEController) calculateSpeed(sm SpeedMeasurement) float64 { // Initialize last wheel data if not set - if lastWheelTime == 0 { - lastWheelRevs = sm.wheelRevs - lastWheelTime = sm.wheelTime + if m.lastWheelTime == 0 { + m.lastWheelRevs = sm.wheelRevs + m.lastWheelTime = sm.wheelTime return 0.0 } - timeDiff := sm.wheelTime - lastWheelTime + timeDiff := sm.wheelTime - m.lastWheelTime if timeDiff == 0 { return 0.0 } - revDiff := int32(sm.wheelRevs - lastWheelRevs) + revDiff := int32(sm.wheelRevs - m.lastWheelRevs) speedConversion := kphConversion if m.speedConfig.SpeedUnits == config.SpeedUnitsMPH { speedConversion = mphConversion } speed := float64(revDiff) * float64(m.speedConfig.WheelCircumferenceMM) * speedConversion / float64(timeDiff) - lastWheelRevs = sm.wheelRevs - lastWheelTime = sm.wheelTime + m.lastWheelRevs = sm.wheelRevs + m.lastWheelTime = sm.wheelTime return speed } diff --git a/internal/video-player/playback_controller.go b/internal/video-player/playback_controller.go index 470c08f..13a0618 100644 --- a/internal/video-player/playback_controller.go +++ b/internal/video-player/playback_controller.go @@ -1,4 +1,3 @@ -// Package video provides video playback control functionality synchronized with speed measurements package video import ( @@ -17,7 +16,7 @@ import ( speed "github.com/richbl/go-ble-sync-cycle/internal/speed" ) -// Common errors for playback control +// Common errors var ( ErrOSDUpdate = errors.New("failed to update OSD") ErrPlaybackSpeed = errors.New("failed to set playback speed") @@ -32,6 +31,12 @@ type PlaybackController struct { player *mpv.Mpv } +// speedState maintains the current state of playback speed +type speedState struct { + current float64 + last float64 +} + // NewPlaybackController creates a new video player with the given config func NewPlaybackController(videoConfig config.VideoConfig, speedConfig config.SpeedConfig) (*PlaybackController, error) { @@ -47,44 +52,69 @@ func NewPlaybackController(videoConfig config.VideoConfig, speedConfig config.Sp }, nil } -// Start configures and starts the MPV media player, then manages the playback loop and -// synchronizes video speed with the provided speed controller +// Start configures and starts the MPV media player func (p *PlaybackController) Start(ctx context.Context, speedController *speed.SpeedController) error { logger.Info(logger.VIDEO, "starting MPV video player...") defer p.player.TerminateDestroy() + if err := p.setup(); err != nil { + return fmt.Errorf("failed to setup player: %w", err) + } + + return p.run(ctx, speedController) +} + +// setup handles initial player configuration and video loading +func (p *PlaybackController) setup() error { + if err := p.configureMPVPlayer(); err != nil { return fmt.Errorf("failed to configure MPV player: %w", err) } logger.Debug(logger.VIDEO, "loading video file:", p.config.FilePath) - if err := p.loadMPVVideo(); err != nil { + if err := p.player.Command([]string{"loadfile", p.config.FilePath}); err != nil { return fmt.Errorf("failed to load video file: %w", err) } - return p.runPlaybackLoop(ctx, speedController) + return nil +} + +// configureMPVPlayer sets up the player window based on configuration +func (p *PlaybackController) configureMPVPlayer() error { + + if err := p.player.SetOptionString("keep-open", "yes"); err != nil { + return err + } + + if p.config.WindowScaleFactor == 1.0 { + logger.Debug(logger.VIDEO, "maximizing video window") + return p.player.SetOptionString("window-maximized", "yes") + } + + scalePercent := strconv.Itoa(int(p.config.WindowScaleFactor * 100)) + return p.player.SetOptionString("autofit", scalePercent+"%") } -// runPlaybackLoop runs the main playback loop, updating the video playback speed -func (p *PlaybackController) runPlaybackLoop(ctx context.Context, speedController *speed.SpeedController) error { +// run handles the main playback loop +func (p *PlaybackController) run(ctx context.Context, speedController *speed.SpeedController) error { + // Set an interval to check for updates ticker := time.NewTicker(time.Millisecond * time.Duration(p.config.UpdateIntervalSec*1000)) defer ticker.Stop() - var lastSpeed float64 + state := &speedState{} logger.Info(logger.VIDEO, "MPV video playback started") - logger.Debug(logger.VIDEO, "entering MPV playback loop...") for { select { case <-ctx.Done(): - fmt.Print("\r") // Clear the ^C character + fmt.Print("\r") logger.Info(logger.VIDEO, "user-generated interrupt, stopping MPV video player...") return nil case <-ticker.C: - if err := p.handlePlaybackTick(speedController, &lastSpeed); err != nil { + if err := p.tick(speedController, state); err != nil { if errors.Is(err, ErrVideoComplete) { return err @@ -92,174 +122,114 @@ func (p *PlaybackController) runPlaybackLoop(ctx context.Context, speedControlle logger.Warn(logger.VIDEO, "playback error:", err.Error()) } + } } } -// handlePlaybackTick updates the video playback speed based on the speed controller -func (p *PlaybackController) handlePlaybackTick(speedController *speed.SpeedController, lastSpeed *float64) error { +// tick handles a single update cycle +func (p *PlaybackController) tick(speedController *speed.SpeedController, state *speedState) error { - // Check for end of file - reachedEOF, err := p.player.GetProperty("eof-reached", mpv.FormatFlag) - if err == nil && reachedEOF.(bool) { + // First, check if playback is complete + if complete, err := p.isPlaybackComplete(); err != nil || complete { return ErrVideoComplete } - if err := p.updatePlaybackSpeed(speedController, lastSpeed); err != nil { - if !strings.Contains(err.Error(), "end of file") { - return fmt.Errorf("error updating playback speed: %w", err) - } - } + // Next, update the speed + state.current = speedController.GetSmoothedSpeed() + p.logDebugInfo(speedController, state) - return nil -} - -// configureMPVPlayer configures the MPV media player -func (p *PlaybackController) configureMPVPlayer() error { - - if err := p.player.SetOptionString("keep-open", "yes"); err != nil { - return err + if state.current == 0 { + return p.handleZeroSpeed() } - // Set video window size - if p.config.WindowScaleFactor == 1.0 { - logger.Debug(logger.VIDEO, "maximizing video window") - return p.player.SetOptionString("window-maximized", "yes") + if p.shouldUpdateSpeed(state) { + return p.updateSpeed(state) } - logger.Debug(logger.VIDEO, "scaling video window") - scalePercent := strconv.Itoa(int(p.config.WindowScaleFactor * 100)) - - return p.player.SetOptionString("autofit", scalePercent+"%") -} - -// loadMPVVideo loads the video file -func (p *PlaybackController) loadMPVVideo() error { - return p.player.Command([]string{"loadfile", p.config.FilePath}) -} - -// updatePlaybackSpeed updates the video playback speed -func (p *PlaybackController) updatePlaybackSpeed(speedController *speed.SpeedController, lastSpeed *float64) error { - - currentSpeed := speedController.GetSmoothedSpeed() - p.logSpeedInfo(speedController, currentSpeed) - - return p.checkSpeedState(currentSpeed, lastSpeed) -} - -// logSpeedInfo logs speed information -func (p *PlaybackController) logSpeedInfo(sc *speed.SpeedController, currentSpeed float64) { - - logger.Debug(logger.VIDEO, "sensor speed buffer: ["+strings.Join(sc.GetSpeedBuffer(), " ")+"]") - logger.Debug(logger.VIDEO, logger.Magenta+"smoothed sensor speed:", - strconv.FormatFloat(currentSpeed, 'f', 2, 64), p.speedConfig.SpeedUnits) + return nil } -// checkSpeedState checks the current speed and updates the playback speed -func (p *PlaybackController) checkSpeedState(currentSpeed float64, lastSpeed *float64) error { - - // If no speed detected, pause playback - if currentSpeed == 0 { - return p.pausePlayback() - } +// isPlaybackComplete checks if the video has finished playing +func (p *PlaybackController) isPlaybackComplete() (bool, error) { - // If the delta between the current speed and the last speed is greater than the threshold, - deltaSpeed := math.Abs(currentSpeed - *lastSpeed) - p.logSpeedDebugInfo(*lastSpeed, deltaSpeed) - - if deltaSpeed > p.speedConfig.SpeedThreshold { - return p.adjustPlayback(currentSpeed, lastSpeed) + reachedEOF, err := p.player.GetProperty("eof-reached", mpv.FormatFlag) + if err != nil { + return false, nil } - return nil + return reachedEOF.(bool), nil } -// logSpeedDebugInfo logs debug information -func (p *PlaybackController) logSpeedDebugInfo(lastSpeed, deltaSpeed float64) { - - logger.Debug(logger.VIDEO, logger.Magenta+"last playback speed:", - strconv.FormatFloat(lastSpeed, 'f', 2, 64), p.speedConfig.SpeedUnits) - logger.Debug(logger.VIDEO, logger.Magenta+"sensor speed delta:", - strconv.FormatFloat(deltaSpeed, 'f', 2, 64), p.speedConfig.SpeedUnits) - logger.Debug(logger.VIDEO, logger.Magenta+"playback speed update threshold:", - strconv.FormatFloat(p.speedConfig.SpeedThreshold, 'f', 2, 64), p.speedConfig.SpeedUnits) +// shouldUpdateSpeed determines if the playback speed needs updating +func (p *PlaybackController) shouldUpdateSpeed(state *speedState) bool { + return math.Abs(state.current-state.last) > p.speedConfig.SpeedThreshold } -// pausePlayback pauses the video playback -func (p *PlaybackController) pausePlayback() error { +// handleZeroSpeed handles the case when no speed is detected +func (p *PlaybackController) handleZeroSpeed() error { - logger.Debug(logger.VIDEO, "no speed detected, so pausing video") + logger.Debug(logger.VIDEO, "no speed detected, pausing video") - if err := p.updateMPVDisplay(0.0, 0.0); err != nil { - return wrapError(ErrOSDUpdate, err) + if err := p.updateDisplay(0.0, 0.0); err != nil { + return fmt.Errorf("%w: %v", ErrOSDUpdate, err) } - return p.setMPVPauseState(true) + return p.player.SetProperty("pause", mpv.FormatFlag, true) } -// adjustPlayback adjusts the video playback speed -func (p *PlaybackController) adjustPlayback(currentSpeed float64, lastSpeed *float64) error { +// updateSpeed adjusts the playback speed based on current speed +func (p *PlaybackController) updateSpeed(state *speedState) error { + + playbackSpeed := (state.current * p.config.SpeedMultiplier) / 10.0 - playbackSpeed := (currentSpeed * p.config.SpeedMultiplier) / 10.0 logger.Debug(logger.VIDEO, logger.Cyan+"updating video playback speed to", strconv.FormatFloat(playbackSpeed, 'f', 2, 64)) - if err := p.updateMPVPlaybackSpeed(playbackSpeed); err != nil { - return wrapError(ErrPlaybackSpeed, err) + if err := p.player.SetProperty("speed", mpv.FormatDouble, playbackSpeed); err != nil { + return fmt.Errorf("%w: %v", ErrPlaybackSpeed, err) } - *lastSpeed = currentSpeed - - if err := p.updateMPVDisplay(currentSpeed, playbackSpeed); err != nil { - return wrapError(ErrOSDUpdate, err) + if err := p.updateDisplay(state.current, playbackSpeed); err != nil { + return fmt.Errorf("%w: %v", ErrOSDUpdate, err) } - return p.setMPVPauseState(false) + state.last = state.current + return p.player.SetProperty("pause", mpv.FormatFlag, false) } -// updateMPVDisplay updates the MPV OSD -func (p *PlaybackController) updateMPVDisplay(cycleSpeed, playbackSpeed float64) error { +// updateDisplay updates the on-screen display +func (p *PlaybackController) updateDisplay(cycleSpeed, playbackSpeed float64) error { if !p.config.OnScreenDisplay.ShowOSD { return nil } - osdText := p.buildOSDText(cycleSpeed, playbackSpeed) - - return p.player.SetOptionString("osd-msg1", osdText) -} - -// buildOSDText builds the MPV OSD text -func (p *PlaybackController) buildOSDText(cycleSpeed, playbackSpeed float64) string { - - // If no speed detected, show "Paused" if cycleSpeed == 0 { - return " Paused" + return p.player.SetOptionString("osd-msg1", " Paused") } - var osdText string + var osdText strings.Builder if p.config.OnScreenDisplay.DisplayCycleSpeed { - osdText += fmt.Sprintf(" Cycle Speed: %.2f %s\n", cycleSpeed, p.speedConfig.SpeedUnits) + fmt.Fprintf(&osdText, " Cycle Speed: %.2f %s\n", cycleSpeed, p.speedConfig.SpeedUnits) } if p.config.OnScreenDisplay.DisplayPlaybackSpeed { - osdText += fmt.Sprintf(" Playback Speed: %.2fx\n", playbackSpeed) + fmt.Fprintf(&osdText, " Playback Speed: %.2fx\n", playbackSpeed) } - return osdText + return p.player.SetOptionString("osd-msg1", osdText.String()) } -// updateMPVPlaybackSpeed updates the video playback speed -func (p *PlaybackController) updateMPVPlaybackSpeed(playbackSpeed float64) error { - return p.player.SetProperty("speed", mpv.FormatDouble, playbackSpeed) -} - -// setMPVPauseState sets the MPV pause state -func (p *PlaybackController) setMPVPauseState(pause bool) error { - return p.player.SetProperty("pause", mpv.FormatFlag, pause) -} - -// wrapError wraps an error with a specific error type for more context -func wrapError(baseErr error, contextErr error) error { - return fmt.Errorf("%w: %v", baseErr, contextErr) +// logDebugInfo logs debug information about current speeds +func (p *PlaybackController) logDebugInfo(speedController *speed.SpeedController, state *speedState) { + logger.Debug(logger.VIDEO, "sensor speed buffer: ["+strings.Join(speedController.GetSpeedBuffer(), " ")+"]") + logger.Debug(logger.VIDEO, logger.Magenta+"smoothed sensor speed:", + strconv.FormatFloat(state.current, 'f', 2, 64), p.speedConfig.SpeedUnits) + logger.Debug(logger.VIDEO, logger.Magenta+"last playback speed:", + strconv.FormatFloat(state.last, 'f', 2, 64), p.speedConfig.SpeedUnits) + logger.Debug(logger.VIDEO, logger.Magenta+"sensor speed delta:", + strconv.FormatFloat(math.Abs(state.current-state.last), 'f', 2, 64), p.speedConfig.SpeedUnits) + logger.Debug(logger.VIDEO, logger.Magenta+"playback speed update threshold:", + strconv.FormatFloat(p.speedConfig.SpeedThreshold, 'f', 2, 64), p.speedConfig.SpeedUnits) } diff --git a/internal/video-player/playback_controller_test.go b/internal/video-player/playback_controller_test.go index 384c2c3..376a1f5 100644 --- a/internal/video-player/playback_controller_test.go +++ b/internal/video-player/playback_controller_test.go @@ -81,7 +81,8 @@ func TestPlaybackFlow(t *testing.T) { // Test video loading t.Run("load video", func(t *testing.T) { - err := controller.loadMPVVideo() + + err := controller.player.Command([]string{"loadfile", controller.config.FilePath}) assert.NoError(t, err, "should load video") }) From bbd73040e6f552e8f4dea174a364f2b24f324bb1 Mon Sep 17 00:00:00 2001 From: richbl Date: Thu, 26 Dec 2024 21:16:38 -0800 Subject: [PATCH 2/2] refactor(APP): Controllers refactor to improve maintainability Signed-off-by: richbl --- cmd/main.go | 145 +++++++++++++++-------- cmd/service_runner.go | 129 ++------------------ cmd/shutdown_manager.go | 156 +++++++++++++++++++++++++ internal/ble/sensor_controller.go | 126 +++++++++++--------- internal/ble/sensor_controller_test.go | 2 +- 5 files changed, 332 insertions(+), 226 deletions(-) create mode 100644 cmd/shutdown_manager.go diff --git a/cmd/main.go b/cmd/main.go index 2b870c6..ed3725c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,8 +13,6 @@ import ( logger "github.com/richbl/go-ble-sync-cycle/internal/logging" speed "github.com/richbl/go-ble-sync-cycle/internal/speed" video "github.com/richbl/go-ble-sync-cycle/internal/video-player" - - "tinygo.org/x/bluetooth" ) // Application constants @@ -38,44 +36,46 @@ func main() { // Hello world! log.Println(appPrefix, "Starting", appName, appVersion) + // Load configuration cfg := loadConfig("config.toml") - // Initialize the shutdown manager and exit handler - sm := NewShutdownManager(shutdownTimeout) - exitHandler := NewExitHandler(sm) - sm.Start() + // Initialize utility services + sm, exitHandler := initializeUtilityServices(cfg) - // Initialize the logger with the configured log level and exit handler - logger.Initialize(cfg.App.LogLevel) - logger.SetExitHandler(func() { - sm.initiateShutdown() - exitHandler.HandleExit() - }) + // Initialize application controllers + controllers := initializeControllers(cfg, exitHandler) - // Initialize the application controllers - controllers, componentType, err := setupAppControllers(*cfg) - if err != nil { - logger.Fatal(componentType, "failed to create controllers:", err.Error()) - return - } + // Scan for BLE device + bleDeviceDiscovery(sm.shutdownCtx.ctx, controllers, exitHandler) - // Scan for the BLE characteristic and handle context cancellation - bleChar, err := scanForBLECharacteristic(sm.Context(), controllers) - if err != nil { + // Start and monitor services for BLE and video components + monitorServiceRunners(startServiceRunners(sm, controllers)) - if err != context.Canceled { - logger.Fatal(logger.BLE, "failed to scan for BLE characteristic:", err.Error()) + // Wait for final shutdown sequences to complete and wave goodbye! + sm.Wait() + waveGoodbye() +} + +// monitorServiceRunners monitors the services and logs any errors encountered +func monitorServiceRunners(runners []*ServiceRunner) { + + for _, runner := range runners { + + if err := runner.Error(); err != nil { + logger.Fatal(logger.APP, "service error:", err.Error()) return } - exitHandler.HandleExit() - return } +} + +// startServiceRunners starts the BLE and video service runners and returns a slice of service runners +func startServiceRunners(sm *ShutdownManager, controllers appControllers) []*ServiceRunner { // Create and run the BLE service runner bleRunner := NewServiceRunner(sm, "BLE") bleRunner.Run(func(ctx context.Context) error { - return controllers.bleController.GetBLEUpdates(ctx, controllers.speedController, bleChar) + return controllers.bleController.GetBLEUpdates(ctx, controllers.speedController) }) // Create and run the video service runner @@ -84,17 +84,57 @@ func main() { return controllers.videoPlayer.Start(ctx, controllers.speedController) }) - // Wait for services to complete and check for errors - for _, runner := range []*ServiceRunner{bleRunner, videoRunner} { - if err := runner.Error(); err != nil { - logger.Fatal(logger.APP, "service error:", err.Error()) + return []*ServiceRunner{bleRunner, videoRunner} +} + +// bleDeviceDiscovery scans for the BLE device and CSC speed characteristic +func bleDeviceDiscovery(ctx context.Context, controllers appControllers, exitHandler *ExitHandler) { + + err := scanForBLECharacteristic(ctx, controllers) + if err != nil { + + if err != context.Canceled { + logger.Fatal(logger.BLE, "failed to scan for BLE characteristic:", err.Error()) return } + + exitHandler.HandleExit() } +} - // Wait for final shutdown sequences to complete and wave goodbye! - sm.Wait() - waveGoodbye() +// initializeUtilityServices initializes the core components of the application, including the shutdown manager, +// exit handler, and logger +func initializeUtilityServices(cfg *config.Config) (*ShutdownManager, *ExitHandler) { + + // Initialize the shutdown manager and exit handler + sm := NewShutdownManager(shutdownTimeout) + exitHandler := NewExitHandler(sm) + sm.Start() + + // Initialize the logger + logger.Initialize(cfg.App.LogLevel) + + // Set the exit handler for the shutdown manager + logger.SetExitHandler(func() { + sm.initiateShutdown() + exitHandler.HandleExit() + }) + + return sm, exitHandler +} + +// initializeControllers initializes the application controllers, including the speed controller, +// video player, and BLE controller. It returns the initialized controllers +func initializeControllers(cfg *config.Config, exitHandler *ExitHandler) appControllers { + + controllers, componentType, err := setupAppControllers(*cfg) + + if err != nil { + logger.Fatal(componentType, "failed to create controllers:", err.Error()) + exitHandler.HandleExit() + } + + return controllers } // setupAppControllers creates and initializes all application controllers @@ -119,31 +159,38 @@ func setupAppControllers(cfg config.Config) (appControllers, logger.ComponentTyp } // scanForBLECharacteristic handles the initial BLE device discovery and characteristic scanning -// using a context for cancellation and returns the discovered characteristic or an error -func scanForBLECharacteristic(ctx context.Context, controllers appControllers) (*bluetooth.DeviceCharacteristic, error) { +func scanForBLECharacteristic(ctx context.Context, controllers appControllers) error { - // Create a channel to receive the result of the BLE characteristic scan - resultsChan := make(chan struct { - char *bluetooth.DeviceCharacteristic - err error - }, 1) + // Create a channel to receive errors from the scan goroutine + errChan := make(chan error, 1) + // BLE peripheral scan and connect go func() { - defer close(resultsChan) - char, err := controllers.bleController.GetBLECharacteristic(ctx, controllers.speedController) - resultsChan <- struct { - char *bluetooth.DeviceCharacteristic - err error - }{char, err} + defer close(errChan) + scanResult, err := controllers.bleController.ScanForBLEPeripheral(ctx) + if err != nil { + errChan <- err + return + } + + connectResult, err := controllers.bleController.ConnectToBLEPeripheral(scanResult) + if err != nil { + errChan <- err + return + } + + // Get the BLE characteristic from the connected device + err = controllers.bleController.GetBLECharacteristic(connectResult) + errChan <- err }() select { case <-ctx.Done(): fmt.Print("\r") // Clear the ^C character from the terminal line logger.Info(logger.BLE, "user-generated interrupt, stopping BLE discovery...") - return nil, ctx.Err() - case result := <-resultsChan: - return result.char, result.err + return ctx.Err() + case result := <-errChan: + return result } } diff --git a/cmd/service_runner.go b/cmd/service_runner.go index 4147a1e..a961b37 100644 --- a/cmd/service_runner.go +++ b/cmd/service_runner.go @@ -2,13 +2,6 @@ package main import ( "context" - "os" - "os/signal" - "sync" - "syscall" - "time" - - logger "github.com/richbl/go-ble-sync-cycle/internal/logging" ) // ServiceRunner manages individual service goroutines and their lifecycle @@ -18,101 +11,6 @@ type ServiceRunner struct { errChan chan error } -// ShutdownManager handles graceful shutdown of application components and coordinates cleanup -// operations with context cancellations and timeout management -type ShutdownManager struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - cleanupFuncs []func() - timeout time.Duration - terminated chan struct{} - cleanupOnce sync.Once -} - -// ExitHandler coordinates the final application shutdown sequence -type ExitHandler struct { - sm *ShutdownManager -} - -// NewShutdownManager creates a new ShutdownManager with the specified timeout duration -func NewShutdownManager(timeout time.Duration) *ShutdownManager { - - ctx, cancel := context.WithCancel(context.Background()) - - return &ShutdownManager{ - ctx: ctx, - cancel: cancel, - wg: sync.WaitGroup{}, - terminated: make(chan struct{}), - timeout: timeout, - } -} - -// Context returns the ShutdownManager's context for cancellation propagation -func (sm *ShutdownManager) Context() context.Context { - return sm.ctx -} - -// Wait blocks until the shutdown sequence is complete -func (sm *ShutdownManager) Wait() { - <-sm.terminated -} - -// WaitGroup returns the ShutdownManager's WaitGroup for goroutine synchronization -func (sm *ShutdownManager) WaitGroup() *sync.WaitGroup { - return &sm.wg -} - -// AddCleanupFn adds a cleanup function to be executed during shutdown -// Note that cleanup functions are executed in reverse order of registration -func (sm *ShutdownManager) AddCleanupFn(fn func()) { - sm.cleanupFuncs = append(sm.cleanupFuncs, fn) -} - -// Start begins listening for shutdown signals (SIGINT, SIGTERM) -// When a signal is received, it initiates the shutdown sequence -func (sm *ShutdownManager) Start() { - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - - go func() { - <-sigChan - sm.initiateShutdown() - }() -} - -// initiateShutdown coordinates the shutdown sequence, including timeout management and cleanup -// function execution, and ensures the shutdown sequence runs only once -func (sm *ShutdownManager) initiateShutdown() { - - sm.cleanupOnce.Do(func() { - sm.cancel() - timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), sm.timeout) - defer timeoutCancel() - done := make(chan struct{}) - - go func() { - sm.wg.Wait() - close(done) - }() - - select { - case <-done: - case <-timeoutCtx.Done(): - logger.Warn(logger.APP, "shutdown timed out, some goroutines may not have cleaned up properly") - } - - // Execute cleanup functions (reverse order) - for i := len(sm.cleanupFuncs) - 1; i >= 0; i-- { - sm.cleanupFuncs[i]() - } - - close(sm.terminated) - }) -} - // NewServiceRunner creates a new ServiceRunner with the specified name and ShutdownManager func NewServiceRunner(sm *ShutdownManager, name string) *ServiceRunner { @@ -127,14 +25,19 @@ func NewServiceRunner(sm *ShutdownManager, name string) *ServiceRunner { // handling cleanup and error propagation func (sr *ServiceRunner) Run(fn func(context.Context) error) { - sr.sm.wg.Add(1) + sr.sm.WaitGroup().Add(1) go func() { - defer sr.sm.wg.Done() + defer sr.sm.WaitGroup().Done() - if err := fn(sr.sm.ctx); err != nil && err != context.Canceled { + if err := fn(sr.sm.shutdownCtx.ctx); err != nil && err != context.Canceled { sr.errChan <- err - sr.sm.cancel() + + // Initiate shutdown on error + if sm := sr.sm; sm != nil { + sm.shutdownCtx.cancel() + } + } close(sr.errChan) @@ -150,19 +53,5 @@ func (sr *ServiceRunner) Error() error { default: return nil } -} - -// NewExitHandler creates a new ExitHandler with the specified shutdown manager -func NewExitHandler(sm *ShutdownManager) *ExitHandler { - return &ExitHandler{sm: sm} -} - -// HandleExit coordinates the final shutdown sequence and exits the application -func (h *ExitHandler) HandleExit() { - - if h.sm != nil { - h.sm.Wait() - } - waveGoodbye() } diff --git a/cmd/shutdown_manager.go b/cmd/shutdown_manager.go new file mode 100644 index 0000000..d6bd57d --- /dev/null +++ b/cmd/shutdown_manager.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + + logger "github.com/richbl/go-ble-sync-cycle/internal/logging" +) + +// ShutdownContext encapsulates the shutdown context and cancel function +type ShutdownContext struct { + ctx context.Context + cancel context.CancelFunc +} + +// CleanupManager manages cleanup functions +type CleanupManager struct { + funcs []func() +} + +// GoroutineManager manages goroutine synchronization and timeout handling +type GoroutineManager struct { + wg *sync.WaitGroup + timeout time.Duration +} + +// ShutdownManager manages the shutdown process +type ShutdownManager struct { + shutdownCtx ShutdownContext + routineMgr *GoroutineManager + cleanupMgr CleanupManager + terminated chan struct{} + cleanupOnce sync.Once +} + +// ExitHandler handles graceful shutdown on exit +type ExitHandler struct { + sm *ShutdownManager +} + +// NewShutdownManager creates a new ShutdownManager with the specified timeout +func NewShutdownManager(timeout time.Duration) *ShutdownManager { + ctx, cancel := context.WithCancel(context.Background()) + return &ShutdownManager{ + shutdownCtx: ShutdownContext{ctx: ctx, cancel: cancel}, + routineMgr: NewGoroutineManager(timeout), + cleanupMgr: CleanupManager{}, + terminated: make(chan struct{}), + } +} + +// NewGoroutineManager creates a new GoroutineManager with the specified timeout +func NewGoroutineManager(timeout time.Duration) *GoroutineManager { + return &GoroutineManager{ + wg: &sync.WaitGroup{}, + timeout: timeout, + } +} + +// NewExitHandler creates a new ExitHandler with the specified ShutdownManager +func NewExitHandler(sm *ShutdownManager) *ExitHandler { + return &ExitHandler{sm: sm} +} + +// Add adds a cleanup function to the CleanupManager +func (cm *CleanupManager) Add(fn func()) { + cm.funcs = append(cm.funcs, fn) +} + +// Execute executes the cleanup functions in reverse order +func (cm *CleanupManager) Execute() { + + for i := len(cm.funcs) - 1; i >= 0; i-- { + cm.funcs[i]() + } + +} + +// Wait blocks until the shutdown sequence is complete +func (sw *GoroutineManager) Wait() { + + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), sw.timeout) + defer timeoutCancel() + done := make(chan struct{}) + + go func() { + sw.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-timeoutCtx.Done(): + logger.Warn(logger.APP, "shutdown timed out, some goroutines may not have cleaned up properly") + } +} + +// Wait blocks until the shutdown sequence is complete +func (sm *ShutdownManager) Wait() { + <-sm.terminated +} + +// WaitGroup returns the ShutdownManager's WaitGroup for goroutine synchronization +func (sm *ShutdownManager) WaitGroup() *sync.WaitGroup { + return sm.routineMgr.wg +} + +// AddCleanupFn adds a cleanup function to be executed during shutdown +// Note that cleanup functions are executed in reverse order of registration +// func (sm *ShutdownManager) AddCleanupFn(fn func()) { +// sm.cleanupMgr.Add(fn) +// } + +// Start monitors for shutdown signals (SIGINT, SIGTERM) and initiates the shutdown sequence +func (sm *ShutdownManager) Start() { + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigChan + + sm.cleanupOnce.Do(func() { + sm.shutdownCtx.cancel() + sm.routineMgr.Wait() + sm.cleanupMgr.Execute() + close(sm.terminated) + }) + + }() +} + +// initiateShutdown coordinates the shutdown sequence +func (sm *ShutdownManager) initiateShutdown() { + + sm.cleanupOnce.Do(func() { + sm.shutdownCtx.cancel() + sm.routineMgr.Wait() + sm.cleanupMgr.Execute() + close(sm.terminated) + }) +} + +// HandleExit waits for the shutdown to complete and then exits the application +func (h *ExitHandler) HandleExit() { + + if h.sm != nil { + h.sm.Wait() + } + + waveGoodbye() +} diff --git a/internal/ble/sensor_controller.go b/internal/ble/sensor_controller.go index c335628..3ccaf33 100644 --- a/internal/ble/sensor_controller.go +++ b/internal/ble/sensor_controller.go @@ -29,11 +29,17 @@ type SpeedMeasurement struct { wheelTime uint16 } -// BLEController manages BLE communication with cycling speed sensors +// BLEDetails holds BLE peripheral details +type BLEDetails struct { + bleConfig config.BLEConfig + bleAdapter bluetooth.Adapter + bleCharacteristic *bluetooth.DeviceCharacteristic +} + +// BLEController holds the BLE controller component and sensor data type BLEController struct { - bleConfig config.BLEConfig + bleDetails BLEDetails speedConfig config.SpeedConfig - bleAdapter bluetooth.Adapter lastWheelRevs uint32 lastWheelTime uint16 } @@ -50,36 +56,72 @@ func NewBLEController(bleConfig config.BLEConfig, speedConfig config.SpeedConfig logger.Info(logger.BLE, "created new BLE central controller") return &BLEController{ - bleConfig: bleConfig, + bleDetails: BLEDetails{ + bleConfig: bleConfig, + bleAdapter: *bleAdapter, + }, speedConfig: speedConfig, - bleAdapter: *bleAdapter, }, nil } -// GetBLECharacteristic scans for the BLE peripheral and returns CSC services/characteristics -func (m *BLEController) GetBLECharacteristic(ctx context.Context, speedController *speed.SpeedController) (*bluetooth.DeviceCharacteristic, error) { +// ScanForBLEPeripheral scans for a BLE peripheral with the specified UUID +func (m *BLEController) ScanForBLEPeripheral(ctx context.Context) (bluetooth.ScanResult, error) { - // Scan for BLE peripheral device - result, err := m.ScanForBLEPeripheral(ctx) - if err != nil { - return nil, err + // Create a context with a timeout + scanCtx, cancel := context.WithTimeout(ctx, time.Duration(m.bleDetails.bleConfig.ScanTimeoutSecs)*time.Second) + defer cancel() + + found := make(chan bluetooth.ScanResult, 1) + errChan := make(chan error, 1) + + go func() { + logger.Info(logger.BLE, "now scanning the ether for BLE peripheral UUID of", m.bleDetails.bleConfig.SensorUUID+"...") + if err := m.startScanning(found); err != nil { + errChan <- err + } + }() + + select { + case result := <-found: + logger.Debug(logger.BLE, "found BLE peripheral", result.Address.String()) + return result, nil + case err := <-errChan: + return bluetooth.ScanResult{}, err + case <-scanCtx.Done(): + if err := m.bleDetails.bleAdapter.StopScan(); err != nil { + logger.Error(logger.BLE, "failed to stop scan:", err.Error()) + } + + return bluetooth.ScanResult{}, errors.New("scanning time limit reached") } +} + +// ConnectToBLEPeripheral connects to a BLE peripheral +func (m *BLEController) ConnectToBLEPeripheral(device bluetooth.ScanResult) (bluetooth.Device, error) { - logger.Debug(logger.BLE, "connecting to BLE peripheral device", result.Address.String()) + logger.Debug(logger.BLE, "connecting to BLE peripheral device", device.Address.String()) - device, err := m.bleAdapter.Connect(result.Address, bluetooth.ConnectionParams{}) + // Connect to the BLE peripheral + connectedDevice, err := m.bleDetails.bleAdapter.Connect(device.Address, bluetooth.ConnectionParams{}) if err != nil { - return nil, err + return bluetooth.Device{}, err } logger.Info(logger.BLE, "BLE peripheral device connected") + return connectedDevice, nil +} + +// GetBLECharacteristic scans a connected BLE peripheral for CSC services/characteristics +// returning the speed characteristic +func (m *BLEController) GetBLECharacteristic(device bluetooth.Device) error { + logger.Debug(logger.BLE, "discovering CSC services", bluetooth.New16BitUUID(0x1816).String()) // Discover CSC services svc, err := device.DiscoverServices([]bluetooth.UUID{bluetooth.New16BitUUID(0x1816)}) if err != nil { logger.Error(logger.BLE, "CSC services discovery failed:", err.Error()) - return nil, err + return err } logger.Debug(logger.BLE, "found CSC service", svc[0].UUID().String()) @@ -89,21 +131,22 @@ func (m *BLEController) GetBLECharacteristic(ctx context.Context, speedControlle char, err := svc[0].DiscoverCharacteristics([]bluetooth.UUID{bluetooth.New16BitUUID(0x2A5B)}) if err != nil { logger.Warn(logger.BLE, "CSC characteristics discovery failed:", err.Error()) - return nil, err + return err } logger.Debug(logger.BLE, "found CSC characteristic", char[0].UUID().String()) - return &char[0], nil + m.bleDetails.bleCharacteristic = &char[0] + return nil } -// GetBLEUpdates enables real-time monitoring of BLE peripheral sensor data, handling -// notification setup/teardown, and updates the speed controller with new readings -func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *speed.SpeedController, char *bluetooth.DeviceCharacteristic) error { +// GetBLEUpdates enables real-time monitoring of BLE peripheral sensor data, handling notification +// setup/teardown, and updates the speed controller with new readings +func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *speed.SpeedController) error { logger.Debug(logger.BLE, "starting real-time monitoring of BLE sensor notifications...") errChan := make(chan error, 1) - if err := char.EnableNotifications(func(buf []byte) { + if err := m.bleDetails.bleCharacteristic.EnableNotifications(func(buf []byte) { speed := m.ProcessBLESpeed(buf) speedController.UpdateSpeed(speed) }); err != nil { @@ -112,7 +155,7 @@ func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *spee // Need to disable BLE notifications when done defer func() { - if err := char.EnableNotifications(nil); err != nil { + if err := m.bleDetails.bleCharacteristic.EnableNotifications(nil); err != nil { logger.Error(logger.BLE, "failed to disable notifications:", err.Error()) } }() @@ -127,37 +170,6 @@ func (m *BLEController) GetBLEUpdates(ctx context.Context, speedController *spee return <-errChan } -// ScanForBLEPeripheral scans for a BLE peripheral with the specified UUID -func (m *BLEController) ScanForBLEPeripheral(ctx context.Context) (bluetooth.ScanResult, error) { - - scanCtx, cancel := context.WithTimeout(ctx, time.Duration(m.bleConfig.ScanTimeoutSecs)*time.Second) - defer cancel() - - found := make(chan bluetooth.ScanResult, 1) - errChan := make(chan error, 1) - - go func() { - logger.Info(logger.BLE, "now scanning the ether for BLE peripheral UUID of", m.bleConfig.SensorUUID+"...") - if err := m.startScanning(found); err != nil { - errChan <- err - } - }() - - select { - case result := <-found: - logger.Debug(logger.BLE, "found BLE peripheral", result.Address.String()) - return result, nil - case err := <-errChan: - return bluetooth.ScanResult{}, err - case <-scanCtx.Done(): - if err := m.bleAdapter.StopScan(); err != nil { - logger.Error(logger.BLE, "failed to stop scan:", err.Error()) - } - - return bluetooth.ScanResult{}, errors.New("scanning time limit reached") - } -} - // ProcessBLESpeed processes raw speed data from the BLE peripheral and returns the calculated speed func (m *BLEController) ProcessBLESpeed(data []byte) float64 { @@ -176,11 +188,11 @@ func (m *BLEController) ProcessBLESpeed(data []byte) float64 { // startScanning starts the BLE scan and sends results to the found channel func (m *BLEController) startScanning(found chan<- bluetooth.ScanResult) error { - err := m.bleAdapter.Scan(func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) { + err := m.bleDetails.bleAdapter.Scan(func(adapter *bluetooth.Adapter, result bluetooth.ScanResult) { - if result.Address.String() == m.bleConfig.SensorUUID { + if result.Address.String() == m.bleDetails.bleConfig.SensorUUID { - if err := m.bleAdapter.StopScan(); err != nil { + if err := m.bleDetails.bleAdapter.StopScan(); err != nil { logger.Error(logger.BLE, "failed to stop scan:", err.Error()) } @@ -207,11 +219,13 @@ func (m *BLEController) calculateSpeed(sm SpeedMeasurement) float64 { return 0.0 } + // Calculate time difference between current and last wheel data timeDiff := sm.wheelTime - m.lastWheelTime if timeDiff == 0 { return 0.0 } + // Calculate the rev difference between current and last wheel data revDiff := int32(sm.wheelRevs - m.lastWheelRevs) speedConversion := kphConversion if m.speedConfig.SpeedUnits == config.SpeedUnitsMPH { diff --git a/internal/ble/sensor_controller_test.go b/internal/ble/sensor_controller_test.go index 2de0b7e..6189104 100644 --- a/internal/ble/sensor_controller_test.go +++ b/internal/ble/sensor_controller_test.go @@ -215,7 +215,7 @@ func TestGetBLECharacteristicIntegration(t *testing.T) { ctx, _ := createTestContextWithTimeout(t) // Expect error since test UUID won't be found - _, err := controller.GetBLECharacteristic(ctx, nil) + _, err := controller.ScanForBLEPeripheral(ctx) assert.Error(t, err) }