diff --git a/.artifact/tree.json b/.artifact/tree.json index 2daa1292..84d81c73 100644 --- a/.artifact/tree.json +++ b/.artifact/tree.json @@ -1,5 +1,83 @@ { "viam-cartographer": { + "mock_data": { + "imu": { + "data.txt": { + "hash": "2b41e7cb9d3f9db51a3fd94f23766e50", + "size": 4242 + } + }, + "lidar": { + "0.pcd": { + "hash": "32b95e765b10c1cfc6097eebc74bbe64", + "size": 9880 + }, + "1.pcd": { + "hash": "a038d9f7ad0356704d512402ead3665b", + "size": 9928 + }, + "10.pcd": { + "hash": "7ffd7681874dd36a8c435a4718e472bc", + "size": 10000 + }, + "11.pcd": { + "hash": "ef95af97b5ed0bc9587cb19dd487f03e", + "size": 9880 + }, + "12.pcd": { + "hash": "592da217520e0a6874ce8387451b59aa", + "size": 9868 + }, + "13.pcd": { + "hash": "1a696bfed9c0cc93d16f76f8c3e295f8", + "size": 10000 + }, + "14.pcd": { + "hash": "3049df7aa2bcde37d068979abc869a3a", + "size": 9952 + }, + "15.pcd": { + "hash": "f001078e01a4104fdfca8294eef99210", + "size": 9952 + }, + "16.pcd": { + "hash": "e1b7aa85857575d275a40b61ed896400", + "size": 9964 + }, + "2.pcd": { + "hash": "924d493bc3ba60a72cb3acad7205663e", + "size": 9952 + }, + "3.pcd": { + "hash": "be52a00e9edab3873e76bc38283e956d", + "size": 10036 + }, + "4.pcd": { + "hash": "d38b3e6275ada37dfd3a5b1ad8518b41", + "size": 10012 + }, + "5.pcd": { + "hash": "41ecd45e566babf33d0e46b57f7f5d91", + "size": 9964 + }, + "6.pcd": { + "hash": "5076d6681530bd995e9155f70dc5fd41", + "size": 9928 + }, + "7.pcd": { + "hash": "6738b43810d9bdb2f93c392d6a71b96b", + "size": 10000 + }, + "8.pcd": { + "hash": "fc8e932ef4ceac3abcb3dc85fc5f970e", + "size": 10060 + }, + "9.pcd": { + "hash": "10e2501cedc4900d542e81e6f4391189", + "size": 9916 + } + } + }, "mock_lidar": { "0.pcd": { "hash": "fc063341f9586fcddc69df23f2870de7", diff --git a/config/config.go b/config/config.go index 975c2ba6..d000dfe1 100644 --- a/config/config.go +++ b/config/config.go @@ -136,7 +136,7 @@ func GetOptionalParameters(config *Config, defaultLidarDataRateMsec, defaultIMUD "setting movement_sensor[data_frequency_hz] to 0") } else { optionalConfigParams.ImuDataRateMsec = defaultIMUDataRateMsec - logger.Warn("config did not provide movement_sensor[data_frequency_hz], "+ + logger.Warnf("config did not provide movement_sensor[data_frequency_hz], "+ "setting to default value of %d", 1000/defaultIMUDataRateMsec) } } else { diff --git a/integration_test.go b/integration_test.go index dd850007..d96d6c68 100644 --- a/integration_test.go +++ b/integration_test.go @@ -8,11 +8,13 @@ package viamcartographer_test import ( "bytes" "context" - "errors" "os" "path" "path/filepath" "reflect" + "runtime" + "strconv" + "sync" "testing" "time" @@ -30,6 +32,73 @@ import ( "github.com/viamrobotics/viam-cartographer/testhelper" ) +const ( + defaultLidarTimeInterval = 200 + defaultIMUTimeInterval = 50 + testTimeout = 20 * time.Second +) + +// Test final position and orientation are at approximately the expected values. +func testCartographerPosition(t *testing.T, svc slam.Service, useIMU bool, expectedComponentRef string) { + var expectedPos r3.Vector + var expectedOri *spatialmath.R4AA + tolerancePos := 0.001 + toleranceOri := 0.001 + + switch { + case runtime.GOOS == "darwin" && !useIMU: + expectedPos = r3.Vector{X: 3.651426324334424, Y: 1.422454179829863, Z: 0} + expectedOri = &spatialmath.R4AA{ + RX: 0, + RY: 0, + RZ: 1, + Theta: 0.0006629744894043836, + } + case runtime.GOOS == "linux" && !useIMU: + expectedPos = r3.Vector{X: 1.2714478890528866, Y: 3.1271067529150076, Z: 0} + expectedOri = &spatialmath.R4AA{ + RX: 0, + RY: 0, + RZ: -1, + Theta: 0.0010751949934010567, + } + + case runtime.GOOS == "darwin" && useIMU: + expectedPos = r3.Vector{X: 4.4700878707562035, Y: 3.1781587655776358, Z: 0} + expectedOri = &spatialmath.R4AA{ + RX: 0.9861776038047263, + RY: 0.1637212678758259, + RZ: 0.025477052402116784, + Theta: 0.02399255141454847, + } + case runtime.GOOS == "linux" && useIMU: + expectedPos = r3.Vector{X: 3.2250269853115867, Y: 5.104006882925285, Z: 0} + expectedOri = &spatialmath.R4AA{ + RX: 0.9864461301028694, + RY: 0.16360809262540335, + RZ: 0.012506975355798564, + Theta: 0.02398663944371901, + } + } + + position, componentRef, err := svc.Position(context.Background()) + test.That(t, err, test.ShouldBeNil) + test.That(t, componentRef, test.ShouldEqual, expectedComponentRef) + + pos := position.Point() + t.Logf("Position point: (%v, %v, %v)", pos.X, pos.Y, pos.Z) + test.That(t, pos.X, test.ShouldAlmostEqual, expectedPos.X, tolerancePos) + test.That(t, pos.Y, test.ShouldAlmostEqual, expectedPos.Y, tolerancePos) + test.That(t, pos.Z, test.ShouldAlmostEqual, expectedPos.Z, tolerancePos) + + ori := position.Orientation().AxisAngles() + t.Logf("Position orientation: RX: %v, RY: %v, RZ: %v, Theta: %v", ori.RX, ori.RY, ori.RZ, ori.Theta) + test.That(t, ori.RX, test.ShouldAlmostEqual, expectedOri.RX, toleranceOri) + test.That(t, ori.RY, test.ShouldAlmostEqual, expectedOri.RY, toleranceOri) + test.That(t, ori.RZ, test.ShouldAlmostEqual, expectedOri.RZ, toleranceOri) + test.That(t, ori.Theta, test.ShouldAlmostEqual, expectedOri.Theta, toleranceOri) +} + // Checks the cartographer map and confirms there at least 100 map points. func testCartographerMap(t *testing.T, svc slam.Service, localizationMode bool) { timestamp1, err := svc.LatestMapInfo(context.Background()) @@ -51,65 +120,9 @@ func testCartographerMap(t *testing.T, svc slam.Service, localizationMode bool) test.That(t, pointcloud.Size(), test.ShouldBeGreaterThanOrEqualTo, 100) } -func testCartographerPosition(t *testing.T, svc slam.Service, useIMU bool, expectedComponentRef string) { - var expectedPosOSX r3.Vector - var expectedPosLinux r3.Vector - var expectedOriOSX *spatialmath.R4AA - var expectedOriLinux *spatialmath.R4AA - tolerancePos := 0.001 - toleranceOri := 0.001 - - if useIMU { - expectedPosOSX = r3.Vector{X: 31.26644008021215, Y: -0.07725723487584407, Z: 0} - expectedPosLinux = r3.Vector{X: 33.36424739867359, Y: -15.892546207753742, Z: -1.7763568394002505e-15} - - expectedOriOSX = &spatialmath.R4AA{Theta: 1.6909088187060277, RX: 0.8939401250703025, RY: 0.11300993950972898, RZ: 0.43370474560615474} - expectedOriLinux = &spatialmath.R4AA{Theta: 1.6301758733667822, RX: 0.9252197096950275, RY: 0.04712768411234466, RZ: 0.3764936522466959} - } else { - expectedPosOSX = r3.Vector{X: 155.7488316264227, Y: -90.25868252233964, Z: 0} - expectedPosLinux = r3.Vector{X: 158.79903385710674, Y: -77.01514065531592, Z: 0} - - expectedOriOSX = &spatialmath.R4AA{Theta: 1.5465081272043815, RX: 0, RY: 0, RZ: 1} - expectedOriLinux = &spatialmath.R4AA{Theta: 0.3331667853231311, RX: 0, RY: 0, RZ: 1} - } - - position, componentRef, err := svc.Position(context.Background()) - test.That(t, err, test.ShouldBeNil) - test.That(t, componentRef, test.ShouldEqual, expectedComponentRef) - - actualPos := position.Point() - t.Logf("Position point: (%v, %v, %v)", actualPos.X, actualPos.Y, actualPos.Z) - // https://viam.atlassian.net/browse/RSDK-3866 - // mac - if actualPos.X > expectedPosOSX.X-tolerancePos && actualPos.X < expectedPosOSX.X+tolerancePos { - test.That(t, actualPos.Y, test.ShouldBeBetween, expectedPosOSX.Y-tolerancePos, expectedPosOSX.Y+tolerancePos) - test.That(t, actualPos.Z, test.ShouldBeBetween, expectedPosOSX.Z-tolerancePos, expectedPosOSX.Z+tolerancePos) - // linux - } else if actualPos.X > expectedPosLinux.X-tolerancePos && actualPos.X < expectedPosLinux.X+tolerancePos { - test.That(t, actualPos.Y, test.ShouldBeBetween, expectedPosLinux.Y-tolerancePos, expectedPosLinux.Y+tolerancePos) - test.That(t, actualPos.Z, test.ShouldBeBetween, expectedPosLinux.Z-tolerancePos, expectedPosLinux.Z+tolerancePos) - } else { - t.Error("TEST FAILED Position is outside of expected platform range") - } - - actualOri := position.Orientation().AxisAngles() - t.Logf("Position orientation: RX: %v, RY: %v, RZ: %v, Theta: %v", actualOri.RX, actualOri.RY, actualOri.RZ, actualOri.Theta) - - if actualOri.Theta > expectedOriOSX.Theta-toleranceOri && actualOri.Theta < expectedOriOSX.Theta+toleranceOri { - test.That(t, actualOri.RX, test.ShouldBeBetween, expectedOriOSX.RX-toleranceOri, expectedOriOSX.RX+toleranceOri) - test.That(t, actualOri.RY, test.ShouldBeBetween, expectedOriOSX.RY-toleranceOri, expectedOriOSX.RY+toleranceOri) - test.That(t, actualOri.Theta, test.ShouldBeBetween, expectedOriOSX.Theta-toleranceOri, expectedOriOSX.Theta+toleranceOri) - } else if actualOri.Theta > expectedOriLinux.Theta-toleranceOri && actualOri.Theta < expectedOriLinux.Theta+toleranceOri { - test.That(t, actualOri.RX, test.ShouldBeBetween, expectedOriLinux.RX-toleranceOri, expectedOriLinux.RX+toleranceOri) - test.That(t, actualOri.RY, test.ShouldBeBetween, expectedOriLinux.RY-toleranceOri, expectedOriLinux.RY+toleranceOri) - test.That(t, actualOri.RZ, test.ShouldBeBetween, expectedOriLinux.RZ-toleranceOri, expectedOriLinux.RZ+toleranceOri) - } else { - t.Error("TEST FAILED Orientation is outside of expected platform range") - } -} - +// Saves cartographer's internal state in the data directory. func saveInternalState(t *testing.T, internalState []byte, dataDir string) { - timeStamp := time.Now() + timeStamp := time.Now().UTC() internalStateDir := filepath.Join(dataDir, "internal_state") if err := os.Mkdir(internalStateDir, 0o755); err != nil { t.Error("TEST FAILED failed to create test internal state directory") @@ -120,6 +133,9 @@ func saveInternalState(t *testing.T, internalState []byte, dataDir string) { } } +// testHelperCartographer is responsible for running a viam-cartographer process using the desired mock sensors. Once started it will +// wait for all data to be processed by monitor sensor channels. After data has been fully processed, the endpoints Position, +// PointCloudMap, InternalState are evaluated and the process is closed out. The final internal state of cartographer is then returned. func testHelperCartographer( t *testing.T, dataDirectory string, @@ -133,8 +149,12 @@ func testHelperCartographer( termFunc := testhelper.InitTestCL(t, logger) defer termFunc() + // Create config + timeTracker := testhelper.TimeTracker{ + Mu: &sync.Mutex{}, + } + attrCfg := &vcConfig.Config{ - Camera: map[string]string{"name": "stub_lidar"}, ConfigParams: map[string]string{ "mode": reflect.ValueOf(subAlgo).String(), }, @@ -142,44 +162,71 @@ func testHelperCartographer( DataDirectory: dataDirectory, IMUIntegrationEnabled: true, } - if useIMU { - attrCfg.MovementSensor = map[string]string{"name": "stub_imu"} - } + // Add lidar component to config (required) lidarDone := make(chan struct{}) + lidarReadingInterval := time.Millisecond * defaultLidarTimeInterval + timeTracker.LidarTime = time.Date(2021, 8, 15, 14, 30, 45, 1, time.UTC) + if replaySensor { + attrCfg.Camera = map[string]string{"name": "stub_lidar", "data_frequency_hz": "0"} + } else { + attrCfg.Camera = map[string]string{"name": "stub_lidar", "data_frequency_hz": strconv.Itoa(defaultLidarTimeInterval)} + } + + // Add imu component to config (optional) imuDone := make(chan struct{}) - lidarReadingInterval := time.Millisecond * 200 - imuReadingInterval := time.Millisecond * 200 - timedLidar, err := testhelper.IntegrationTimedLidarSensor(t, attrCfg.Camera["name"], replaySensor, lidarReadingInterval, lidarDone) + imuReadingInterval := time.Millisecond * defaultIMUTimeInterval + if useIMU { + if replaySensor { + attrCfg.MovementSensor = map[string]string{"name": "stub_imu", "data_frequency_hz": "0"} + } else { + attrCfg.MovementSensor = map[string]string{"name": "stub_imu", "data_frequency_hz": strconv.Itoa(defaultIMUTimeInterval)} + } + timeTracker.ImuTime = time.Date(2021, 8, 15, 14, 30, 45, 1, time.UTC) + } + // Start Sensors + timedLidar, err := testhelper.IntegrationTimedLidarSensor(t, attrCfg.Camera["name"], + replaySensor, lidarReadingInterval, lidarDone, &timeTracker) test.That(t, err, test.ShouldBeNil) - timedIMU, err := testhelper.IntegrationTimedIMUSensor(t, attrCfg.MovementSensor["name"], replaySensor, imuReadingInterval, imuDone) + timedIMU, err := testhelper.IntegrationTimedIMUSensor(t, attrCfg.MovementSensor["name"], + replaySensor, imuReadingInterval, imuDone, &timeTracker) test.That(t, err, test.ShouldBeNil) + if !useIMU { test.That(t, timedIMU, test.ShouldBeNil) } + + // Start SLAM Service svc, err := testhelper.CreateIntegrationSLAMService(t, attrCfg, timedLidar, timedIMU, logger) test.That(t, err, test.ShouldBeNil) - start := time.Now() cSvc, ok := svc.(*viamcartographer.CartographerService) test.That(t, ok, test.ShouldBeTrue) test.That(t, cSvc.SlamMode, test.ShouldEqual, expectedMode) - ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5) + // Wait for sensor processes to finish sending data and for context to be canceled + start := time.Now().UTC() + ctx, cancelFunc := context.WithTimeout(context.Background(), testTimeout) defer cancelFunc() - // wait till all sensor readings have been read - if !utils.SelectContextOrWaitChan(ctx, lidarDone) { - t.Logf("test duration %dms", time.Since(start).Milliseconds()) - test.That(t, errors.New("test timeout"), test.ShouldBeNil) + + finishedProcessingLidarData := utils.SelectContextOrWaitChan(ctx, lidarDone) + t.Logf("lidar sensor process duration %dms (timeout = %dms)", time.Since(start).Milliseconds(), testTimeout.Milliseconds()) + test.That(t, finishedProcessingLidarData, test.ShouldBeTrue) + + if useIMU { + finishedProcessingIMUData := utils.SelectContextOrWaitChan(ctx, imuDone) + t.Logf("imu sensor process duration %dms (timeout = %dms)", time.Since(start).Milliseconds(), testTimeout.Milliseconds()) + test.That(t, finishedProcessingIMUData, test.ShouldBeTrue) } - // We will check both channels once an accurate mock dataset has been gathered, - // see https://viam.atlassian.net/browse/RSDK-4495 + t.Logf("sensor processes have completed, all data has been ingested") + // Test end points and retrieve internal state testCartographerPosition(t, svc, useIMU, attrCfg.Camera["name"]) testCartographerMap(t, svc, cSvc.SlamMode == cartofacade.LocalizingMode) internalState, err := slam.InternalStateFull(context.Background(), svc) test.That(t, err, test.ShouldBeNil) + logger.Debug("closing out service") // Close out slam service test.That(t, svc.Close(context.Background()), test.ShouldBeNil) @@ -196,194 +243,124 @@ func testHelperCartographer( return internalState } -func integrationTestHelperCartographer(t *testing.T, subAlgo viamcartographer.SubAlgo) { +// TestIntegrationCartographer provides end-to-end testing of viam-cartographer using a combination of live vs. replay cameras +// and imu enabled mode. +func TestIntegrationCartographer(t *testing.T) { logger := golog.NewTestLogger(t) - t.Run("live sensor mapping mode without IMU", func(t *testing.T) { - dataDirectory, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectory) - test.That(t, err, test.ShouldBeNil) - }() - - testHelperCartographer(t, dataDirectory, subAlgo, logger, false, false, 1, cartofacade.MappingMode) - }) - - t.Run("replay sensor mapping mode", func(t *testing.T) { - dataDirectory, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectory) - test.That(t, err, test.ShouldBeNil) - }() - - testHelperCartographer(t, dataDirectory, subAlgo, logger, true, false, 1, cartofacade.MappingMode) - }) - - t.Run("live sensor localizing mode without IMU", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) - test.That(t, err, test.ShouldBeNil) - }() - - // do a mapping run with replay sensor - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, false, 1, cartofacade.MappingMode) - - dataDirectoryLocalizing, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryLocalizing) - test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new datadir - saveInternalState(t, internalState, dataDirectoryLocalizing) - // localize on that internal state - testHelperCartographer(t, dataDirectoryLocalizing, subAlgo, logger, false, false, 0, cartofacade.LocalizingMode) - }) - - t.Run("replay sensor localizing mode", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) - test.That(t, err, test.ShouldBeNil) - }() - // do a mapping run with replay sensor - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, false, 1, cartofacade.MappingMode) - - dataDirectoryLocalizing, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryLocalizing) - test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new datadir - saveInternalState(t, internalState, dataDirectoryLocalizing) - // localize on that internal state - testHelperCartographer(t, dataDirectoryLocalizing, subAlgo, logger, true, false, 0, cartofacade.LocalizingMode) - }) - - t.Run("live sensor updating mode without IMU", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) - test.That(t, err, test.ShouldBeNil) - }() - - // do a mapping run - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, false, 1, cartofacade.MappingMode) - - dataDirectoryUpdating, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryUpdating) - test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new datadir - saveInternalState(t, internalState, dataDirectoryUpdating) - // update that internal state - testHelperCartographer(t, dataDirectoryUpdating, subAlgo, logger, false, false, 1, cartofacade.UpdatingMode) - }) - - t.Run("replay sensor updating mode", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) - test.That(t, err, test.ShouldBeNil) - }() - - // do a mapping run - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, false, 1, cartofacade.MappingMode) - - dataDirectoryUpdating, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryUpdating) - test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new dat adir - saveInternalState(t, internalState, dataDirectoryUpdating) - // update fromthat internal state - testHelperCartographer(t, dataDirectoryUpdating, subAlgo, logger, true, false, 1, cartofacade.UpdatingMode) - }) -} - -func integrationTestHelperCartographerWithIMU(t *testing.T, subAlgo viamcartographer.SubAlgo) { - logger := golog.NewTestLogger(t) - - t.Run("live sensor mapping mode with IMU", func(t *testing.T) { - dataDirectory, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectory) - test.That(t, err, test.ShouldBeNil) - }() - - testHelperCartographer(t, dataDirectory, subAlgo, logger, false, true, 1, cartofacade.MappingMode) - }) + cases := []struct { + description string + replay bool + imuEnabled bool + mode cartofacade.SlamMode + subAlgo viamcartographer.SubAlgo + }{ + // Live sensor + { + description: "live sensor mapping mode 2D", + replay: false, + imuEnabled: false, + mode: cartofacade.MappingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "live sensor localizing mode 2D", + replay: false, + imuEnabled: false, + mode: cartofacade.LocalizingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "live sensor updating mode 2D", + replay: false, + imuEnabled: false, + mode: cartofacade.UpdatingMode, + subAlgo: viamcartographer.Dim2d, + }, + // Replay sensor + { + description: "replay sensor mapping mode 2D", + replay: true, + imuEnabled: false, + mode: cartofacade.MappingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "replay sensor localizing mode 2D", + replay: true, + imuEnabled: false, + mode: cartofacade.LocalizingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "replay sensor updating mode 2D", + replay: true, + imuEnabled: false, + mode: cartofacade.UpdatingMode, + subAlgo: viamcartographer.Dim2d, + }, + // Live + imu sensor + { + description: "replay with imu sensor mapping mode 2D", + replay: true, + imuEnabled: true, + mode: cartofacade.MappingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "replay with imu sensor localizing mode 2D", + replay: true, + imuEnabled: true, + mode: cartofacade.LocalizingMode, + subAlgo: viamcartographer.Dim2d, + }, + { + description: "replay with imu sensor updating mode 2D", + replay: true, + imuEnabled: true, + mode: cartofacade.UpdatingMode, + subAlgo: viamcartographer.Dim2d, + }, + } - t.Run("live sensor localizing mode with IMU", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) + // Loop over defined test cases, resetting the directories between slam sessions + for _, tt := range cases { + t.Run(tt.description, func(t *testing.T) { + // Prep first run directory + dataDirectory1, err := os.MkdirTemp("", "*") test.That(t, err, test.ShouldBeNil) - }() + defer func() { + err := os.RemoveAll(dataDirectory1) + test.That(t, err, test.ShouldBeNil) + }() - // do a mapping run with replay sensor - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, true, 1, cartofacade.MappingMode) + // Set mapRateSec for mapping mode + mapRateSec := 1 - dataDirectoryLocalizing, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryLocalizing) - test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new datadir - saveInternalState(t, internalState, dataDirectoryLocalizing) - // localize on that internal state - testHelperCartographer(t, dataDirectoryLocalizing, subAlgo, logger, false, true, 0, cartofacade.LocalizingMode) - }) - - t.Run("live sensor updating mode with IMU", func(t *testing.T) { - dataDirectoryMapping, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryMapping) - test.That(t, err, test.ShouldBeNil) - }() + // Run mapping test + internalState := testHelperCartographer(t, dataDirectory1, tt.subAlgo, logger, tt.replay, tt.imuEnabled, 1, cartofacade.MappingMode) - // do a mapping run - internalState := testHelperCartographer(t, dataDirectoryMapping, subAlgo, logger, true, true, 1, cartofacade.MappingMode) + // Return if in mapping mode as there are no further actions required + if tt.mode == cartofacade.MappingMode { + return + } - dataDirectoryUpdating, err := os.MkdirTemp("", "*") - test.That(t, err, test.ShouldBeNil) - defer func() { - err := os.RemoveAll(dataDirectoryUpdating) + // Prep second run directory + dataDirectory2, err := os.MkdirTemp("", "*") test.That(t, err, test.ShouldBeNil) - }() - - // save the internal state of the mapping run to a new datadir - saveInternalState(t, internalState, dataDirectoryUpdating) - // update from that internal state - testHelperCartographer(t, dataDirectoryUpdating, subAlgo, logger, false, true, 1, cartofacade.UpdatingMode) - }) -} - -func TestCartographerIntegration2D(t *testing.T) { - integrationTestHelperCartographer(t, viamcartographer.Dim2d) -} - -func TestCartographerIntegrationWithIMU2D(t *testing.T) { - integrationTestHelperCartographerWithIMU(t, viamcartographer.Dim2d) + defer func() { + err := os.RemoveAll(dataDirectory2) + test.That(t, err, test.ShouldBeNil) + }() + + // Save internal state + saveInternalState(t, internalState, dataDirectory2) + + // Run follow up updating or localizing test + if tt.mode == cartofacade.LocalizingMode { + mapRateSec = 0 + } + testHelperCartographer(t, dataDirectory2, tt.subAlgo, logger, tt.replay, tt.imuEnabled, mapRateSec, tt.mode) + }) + } } diff --git a/sensorprocess/imusensorprocess.go b/sensorprocess/imusensorprocess.go new file mode 100644 index 00000000..cdd9e688 --- /dev/null +++ b/sensorprocess/imusensorprocess.go @@ -0,0 +1,159 @@ +// Package sensorprocess contains the logic to add lidar or replay sensor readings to cartographer's cartofacade +package sensorprocess + +import ( + "context" + "errors" + "math" + "strings" + "time" + + replaymovementsensor "go.viam.com/rdk/components/movementsensor/replay" + + "github.com/viamrobotics/viam-cartographer/cartofacade" + "github.com/viamrobotics/viam-cartographer/sensors" +) + +// StartIMU polls the IMU to get the next sensor reading and adds it to the cartofacade. +// stops when the context is Done. +func (config *Config) StartIMU(ctx context.Context) bool { + for { + select { + case <-ctx.Done(): + return false + default: + if jobDone := config.addIMUReading(ctx); jobDone { + return true + } + } + } +} + +// addIMUReading adds a imu reading to the cartofacade, using the lidar's data rate to determine whether to run in +// offline or online mode. +func (config *Config) addIMUReading(ctx context.Context) bool { + if config.LidarDataRateMsec != 0 { + return config.addIMUReadingInOnline(ctx) + } + return config.addIMUReadingInOffline(ctx) +} + +// addIMUReadingInOnline ensure the most recent imu scan, after corresponding lidar scans, gets processed by cartographer. +func (config *Config) addIMUReadingInOnline(ctx context.Context) bool { + // get next imu data response + tsr, status, err := getTimedIMUSensorReading(ctx, config) + if err != nil { + return status + } + + // parse imu reading + sr := cartofacade.IMUReading{ + LinearAcceleration: tsr.LinearAcceleration, + AngularVelocity: tsr.AngularVelocity, + } + + // update stored imu time + config.updateMutexProtectedIMUData(tsr.ReadingTime, sr) + + // add imu data to cartographer and sleep remainder of time interval + timeToSleep := config.tryAddIMUReading(ctx, sr, tsr.ReadingTime) + time.Sleep(time.Duration(timeToSleep) * time.Millisecond) + config.Logger.Debugf("imu sleep for %vms", timeToSleep) + + return false +} + +// addIMUReadingInOffline ensures imu scans get added in a time ordered series with any desired lidar scans without skipping any. +func (config *Config) addIMUReadingInOffline(ctx context.Context) bool { + // extract current lidar reading time for ordering data ingestion + config.Mutex.Lock() + sensorProcessStartTime := config.sensorProcessStartTime + config.Mutex.Unlock() + + if sensorProcessStartTime != defaultTime && config.currentIMUData.time != defaultTime { + // skip adding measurement if imu data has been defined but occurs before first lidar data + if sensorProcessStartTime.Sub(config.currentIMUData.time).Milliseconds() >= 0 { + time.Sleep(10 * time.Millisecond) + return false + } + + // add IMU data + config.tryAddIMUReadingUntilSuccess(ctx, config.currentIMUData.data, config.currentIMUData.time) + } + + // get next imu data response + tsr, status, err := getTimedIMUSensorReading(ctx, config) + if err != nil { + return status + } + + // parse imu reading + sr := cartofacade.IMUReading{ + LinearAcceleration: tsr.LinearAcceleration, + AngularVelocity: tsr.AngularVelocity, + } + + // TODO: Remove dropping out of order imu readings after DATA-1812 has been complete + // JIRA Ticket: https://viam.atlassian.net/browse/DATA-1812 + // update current imu data and time + if config.currentIMUData.time.Sub(tsr.ReadingTime).Milliseconds() < 0 { + config.updateMutexProtectedIMUData(tsr.ReadingTime, sr) + } else { + config.Logger.Debugf("%v \t | IMU | Dropping data \t \t | %v \n", tsr.ReadingTime, tsr.ReadingTime.Unix()) + } + + return false +} + +// tryAddIMUReadingUntilSuccess adds a reading to the cartofacade and retries on error (offline mode). While add sensor reading +// fails, keep trying to add the same reading - in offline mode we want to process each reading so if we cannot acquire the lock +// we should try again. +func (config *Config) tryAddIMUReadingUntilSuccess(ctx context.Context, reading cartofacade.IMUReading, readingTime time.Time) { + for { + select { + case <-ctx.Done(): + return + default: + err := config.CartoFacade.AddIMUReading(ctx, config.Timeout, config.IMUName, reading, readingTime) + if err == nil { + config.Logger.Debugf("%v \t | IMU | Success \t \t | %v \n", readingTime, readingTime.Unix()) + return + } + if !errors.Is(err, cartofacade.ErrUnableToAcquireLock) { + config.Logger.Warnw("Retrying sensor reading due to error from cartofacade", "error", err) + } + config.Logger.Debugf("%v \t | IMU | Failure \t \t | %v \n", readingTime, readingTime.Unix()) + } + } +} + +// tryAddIMUReading adds a reading to the carto facade and does not retry (online). +func (config *Config) tryAddIMUReading(ctx context.Context, reading cartofacade.IMUReading, readingTime time.Time) int { + startTime := time.Now().UTC() + err := config.CartoFacade.AddIMUReading(ctx, config.Timeout, config.IMUName, reading, readingTime) + if err != nil { + config.Logger.Debugf("%v \t | IMU | Failure \t \t | %v \n", readingTime, readingTime.Unix()) + if errors.Is(err, cartofacade.ErrUnableToAcquireLock) { + config.Logger.Debugw("Skipping sensor reading due to lock contention in cartofacade", "error", err) + } else { + config.Logger.Warnw("Skipping sensor reading due to error from cartofacade", "error", err) + } + } + config.Logger.Debugf("%v \t | IMU | Success \t \t | %v \n", readingTime, readingTime.Unix()) + timeElapsedMs := int(time.Since(startTime).Milliseconds()) + return int(math.Max(0, float64(config.IMUDataRateMsec-timeElapsedMs))) +} + +// getTimedIMUSensorReading returns the next imu reading if available along with a status denoting if the end of dataset has been reached. +func getTimedIMUSensorReading(ctx context.Context, config *Config) (sensors.TimedIMUSensorReadingResponse, bool, error) { + tsr, err := config.IMU.TimedIMUSensorReading(ctx) + if err != nil { + config.Logger.Warn(err) + // only end the sensor process if we are in offline mode + if config.LidarDataRateMsec == 0 { + return tsr, strings.Contains(err.Error(), replaymovementsensor.ErrEndOfDataset.Error()), err + } + return tsr, false, err + } + return tsr, false, err +} diff --git a/sensorprocess/lidarsensorprocess.go b/sensorprocess/lidarsensorprocess.go new file mode 100644 index 00000000..3e17b8ea --- /dev/null +++ b/sensorprocess/lidarsensorprocess.go @@ -0,0 +1,154 @@ +// Package sensorprocess contains the logic to add lidar or replay sensor readings to cartographer's cartofacade +package sensorprocess + +import ( + "context" + "errors" + "math" + "strings" + "time" + + "go.viam.com/rdk/components/camera/replaypcd" + + "github.com/viamrobotics/viam-cartographer/cartofacade" + "github.com/viamrobotics/viam-cartographer/sensors" +) + +// StartLidar polls the lidar to get the next sensor reading and adds it to the cartofacade. +// stops when the context is Done. +func (config *Config) StartLidar(ctx context.Context) bool { + for { + select { + case <-ctx.Done(): + return false + default: + if jobDone := config.addLidarReading(ctx); jobDone { + config.Logger.Info("Beginning final optimization") + err := config.RunFinalOptimizationFunc(ctx, config.InternalTimeout) + if err != nil { + config.Logger.Error("Failed to finish processing all sensor readings: ", err) + } + return true + } + } + } +} + +// addLidarReading adds a lidar reading to the cartofacade, using the lidar's data rate to determine whether to run in +// offline or online mode. +func (config *Config) addLidarReading(ctx context.Context) bool { + if config.LidarDataRateMsec != 0 { + return config.addLidarReadingsInOnline(ctx) + } + return config.addLidarReadingsInOffline(ctx) +} + +// addLidarReadingsInOnline ensure the most recent lidar scan, after any corresponding imu scans, gets processed by cartographer. +func (config *Config) addLidarReadingsInOnline(ctx context.Context) bool { + // get next lidar data response + tsr, status, err := getTimedLidarSensorReading(ctx, config) + if err != nil { + return status + } + + // update stored lidar timestamp + config.updateMutexProtectedLidarData(tsr.ReadingTime, tsr.Reading) + + // add lidar data to cartographer and sleep remainder of time interval + timeToSleep := config.tryAddLidarReading(ctx, tsr.Reading, tsr.ReadingTime) + time.Sleep(time.Duration(timeToSleep) * time.Millisecond) + config.Logger.Debugf("lidar sleep for %vms", timeToSleep) + + return false +} + +// addLidarReadingsInOffline ensures lidar scans get added in a time ordered series with any desired imu scans without skipping any. +func (config *Config) addLidarReadingsInOffline(ctx context.Context) bool { + // Extract current imu reading time for ordering data ingestion + config.Mutex.Lock() + currentIMUTime := config.currentIMUData.time + config.Mutex.Unlock() + + // If an IMU exists, skip adding measurement until the current lidar time is after the current imu timestamp + if config.IMUName != "" && config.currentLidarData.time.Sub(currentIMUTime).Milliseconds() > 0 { + time.Sleep(10 * time.Millisecond) + return false + } + + // Add current lidar data if it is non-nil + if config.currentLidarData.data != nil { + config.tryAddLidarReadingUntilSuccess(ctx, config.currentLidarData.data, config.currentLidarData.time) + + config.Mutex.Lock() + if config.sensorProcessStartTime == defaultTime { + config.sensorProcessStartTime = config.currentLidarData.time + } + config.Mutex.Unlock() + } + + // get next lidar data response + tsr, status, err := getTimedLidarSensorReading(ctx, config) + if err != nil { + return status + } + + // update stored lidar timestamp + config.updateMutexProtectedLidarData(tsr.ReadingTime, tsr.Reading) + + return false +} + +// tryAddLidarReadingUntilSuccess adds a reading to the cartofacade and retries on error (offline mode). While add lidar +// reading fails, keep trying to add the same reading - in offline mode we want to process each reading so if we cannot +// acquire the lock we should try again. +func (config *Config) tryAddLidarReadingUntilSuccess(ctx context.Context, reading []byte, readingTime time.Time) { + for { + select { + case <-ctx.Done(): + return + default: + err := config.CartoFacade.AddLidarReading(ctx, config.Timeout, config.LidarName, reading, readingTime) + if err == nil { + config.Logger.Debugf("%v \t | LIDAR | Success \t \t | %v \n", readingTime, readingTime.Unix()) + return + } + if !errors.Is(err, cartofacade.ErrUnableToAcquireLock) { + config.Logger.Warnw("Retrying sensor reading due to error from cartofacade", "error", err) + } + config.Logger.Debugf("%v \t | LIDAR | Failure \t \t | %v \n", readingTime, readingTime.Unix()) + } + } +} + +// tryAddLidarReading adds a reading to the carto facade and does not retry (online). +func (config *Config) tryAddLidarReading(ctx context.Context, reading []byte, readingTime time.Time) int { + startTime := time.Now().UTC() + + err := config.CartoFacade.AddLidarReading(ctx, config.Timeout, config.LidarName, reading, readingTime) + if err != nil { + config.Logger.Debugf("%v \t | LIDAR | Failure \t \t | %v \n", readingTime, readingTime.Unix()) + if errors.Is(err, cartofacade.ErrUnableToAcquireLock) { + config.Logger.Debugw("Skipping lidar reading due to lock contention in cartofacade", "error", err) + } else { + config.Logger.Warnw("Skipping lidar reading due to error from cartofacade", "error", err) + } + } + config.Logger.Debugf("%v \t | LIDAR | Success \t \t | %v \n", readingTime, readingTime.Unix()) + timeElapsedMs := int(time.Since(startTime).Milliseconds()) + return int(math.Max(0, float64(config.LidarDataRateMsec-timeElapsedMs))) +} + +// getTimedLidarSensorReading returns the next lidar reading if available along with a status denoting if the end of dataset has been +// reached. +func getTimedLidarSensorReading(ctx context.Context, config *Config) (sensors.TimedLidarSensorReadingResponse, bool, error) { + tsr, err := config.Lidar.TimedLidarSensorReading(ctx) + if err != nil { + config.Logger.Warn(err) + // only end the sensor process if we are in offline mode + if config.LidarDataRateMsec == 0 { + return tsr, strings.Contains(err.Error(), replaypcd.ErrEndOfDataset.Error()), err + } + return tsr, false, err + } + return tsr, false, err +} diff --git a/sensorprocess/sensorprocess.go b/sensorprocess/sensorprocess.go index a66a9e18..d7a60c49 100644 --- a/sensorprocess/sensorprocess.go +++ b/sensorprocess/sensorprocess.go @@ -3,311 +3,67 @@ package sensorprocess import ( "context" - "errors" - "math" - "strings" + "sync" "time" "github.com/edaniels/golog" - "go.viam.com/rdk/components/camera/replaypcd" - replaymovementsensor "go.viam.com/rdk/components/movementsensor/replay" "github.com/viamrobotics/viam-cartographer/cartofacade" "github.com/viamrobotics/viam-cartographer/sensors" ) -var ( - undefinedIMU = cartofacade.IMUReading{} - defaultTime = time.Time{} -) +// defaultTime is used to check if timestamps have not been set yet. +var defaultTime = time.Time{} // Config holds config needed throughout the process of adding a sensor reading to the cartofacade. type Config struct { - CartoFacade cartofacade.Interface - Lidar sensors.TimedLidarSensor - LidarName string - LidarDataRateMsec int - IMU sensors.TimedIMUSensor - IMUName string - IMUDataRateMsec int + CartoFacade cartofacade.Interface + sensorProcessStartTime time.Time + + Lidar sensors.TimedLidarSensor + LidarName string + LidarDataRateMsec int + currentLidarData LidarData + + IMU sensors.TimedIMUSensor + IMUName string + IMUDataRateMsec int + currentIMUData IMUData + Timeout time.Duration InternalTimeout time.Duration Logger golog.Logger - nextLidarData nextLidarData - nextIMUData nextIMUData - firstLidarReadingTime time.Time - lastLidarReadingTime time.Time RunFinalOptimizationFunc func(context.Context, time.Duration) error -} -// nextData stores the next data to be added to cartographer along with its associated timestamp so that, -// in offline mode, data from multiple sensors can be added in order. -type nextLidarData struct { - time time.Time - data []byte + Mutex *sync.Mutex } -type nextIMUData struct { +// IMUData stores the next data to be added to cartographer along with its associated timestamp so that, +// in offline mode, data from multiple sensors can be added in order. +type IMUData struct { time time.Time data cartofacade.IMUReading } -// StartLidar polls the lidar to get the next sensor reading and adds it to the cartofacade. -// stops when the context is Done. -func (config *Config) StartLidar( - ctx context.Context, -) bool { - for { - select { - case <-ctx.Done(): - return false - default: - if jobDone := config.addLidarReading(ctx); jobDone { - config.lastLidarReadingTime = config.nextLidarData.time - config.Logger.Info("Beginning final optimization") - err := config.RunFinalOptimizationFunc(ctx, config.InternalTimeout) - if err != nil { - config.Logger.Error("Failed to finish processing all sensor readings: ", err) - } - return true - } - } - } -} - -// addLidarReading adds a lidar reading to the cartofacade. -func (config *Config) addLidarReading(ctx context.Context) bool { - /* - when the lidar data rate msec is non-zero, we assume the user wants to be in "online" - mode and ensure the most recent scan gets processed by cartographer. If data rate msec - is zero we process every scan in order - */ - if config.LidarDataRateMsec != 0 { - // get next lidar data response - tsr, status, err := getTimedLidarSensorReading(ctx, config) - if err != nil { - return status - } - - // sleep remainder of time interval - timeToSleep := tryAddLidarReading(ctx, tsr.Reading, tsr.ReadingTime, *config) - time.Sleep(time.Duration(timeToSleep) * time.Millisecond) - config.Logger.Debugf("sleep for %s milliseconds", time.Duration(timeToSleep)) - } else { - /* - In order for cartographer to build a correct map, the lidar and imu readings need to be processed in - order in offline mode. We only add the stored lidar data if we do not have any IMU data to add, or if - the next IMU data has a timestamp after the current lidar reading's timestamp. - */ - if config.IMUName == "" || config.nextLidarData.time.Sub(config.nextIMUData.time).Milliseconds() <= 0 { - if config.nextLidarData.data != nil { - tryAddLidarReadingUntilSuccess(ctx, config.nextLidarData.data, config.nextLidarData.time, *config) - if config.firstLidarReadingTime == defaultTime { - config.firstLidarReadingTime = config.nextLidarData.time - } - } - // get next lidar data response - tsr, status, err := getTimedLidarSensorReading(ctx, config) - if err != nil { - return status - } - - config.nextLidarData.time = tsr.ReadingTime - config.nextLidarData.data = tsr.Reading - } else { - time.Sleep(time.Millisecond) - } - } - return false -} - -// tryAddLidarReadingUntilSuccess adds a reading to the cartofacade and retries on error (offline mode). -func tryAddLidarReadingUntilSuccess(ctx context.Context, reading []byte, readingTime time.Time, config Config) { - /* - while add lidar reading fails, keep trying to add the same reading - in offline mode - we want to process each reading so if we cannot acquire the lock we should try again - */ - for { - select { - case <-ctx.Done(): - return - default: - err := config.CartoFacade.AddLidarReading(ctx, config.Timeout, config.LidarName, reading, readingTime) - if err == nil { - config.Logger.Debugf("%v \t | LIDAR | Success \t \t | %v \n", readingTime, readingTime.Unix()) - return - } - if !errors.Is(err, cartofacade.ErrUnableToAcquireLock) { - config.Logger.Warnw("Retrying sensor reading due to error from cartofacade", "error", err) - } - config.Logger.Debugf("%v \t | LIDAR | Failure \t \t | %v \n", readingTime, readingTime.Unix()) - } - } -} - -// tryAddLidarReading adds a reading to the carto facade and does not retry (online). -func tryAddLidarReading(ctx context.Context, reading []byte, readingTime time.Time, config Config) int { - startTime := time.Now() - err := config.CartoFacade.AddLidarReading(ctx, config.Timeout, config.LidarName, reading, readingTime) - if err != nil { - if errors.Is(err, cartofacade.ErrUnableToAcquireLock) { - config.Logger.Debugw("Skipping lidar reading due to lock contention in cartofacade", "error", err) - } else { - config.Logger.Warnw("Skipping lidar reading due to error from cartofacade", "error", err) - } - } - timeElapsedMs := int(time.Since(startTime).Milliseconds()) - return int(math.Max(0, float64(config.LidarDataRateMsec-timeElapsedMs))) -} - -// StartIMU polls the IMU to get the next sensor reading and adds it to the cartofacade. -// stops when the context is Done. -func (config *Config) StartIMU( - ctx context.Context, -) bool { - for { - select { - case <-ctx.Done(): - return false - default: - if config.lastLidarReadingTime != defaultTime && config.nextIMUData.time.Sub(config.lastLidarReadingTime) > 0 { - return true - } - if jobDone := config.addIMUReading(ctx); jobDone { - return true - } - } - } -} - -// addIMUReading adds an IMU reading to the cartofacade. -func (config *Config) addIMUReading( - ctx context.Context, -) bool { - /* - when the lidar data rate msec is non-zero, we assume the user wants to be in "online" - mode and ensure the most recent scan gets processed by cartographer. If data rate msec - is zero we process every scan in order - */ - if config.LidarDataRateMsec != 0 { - // get next imu data response - tsr, status, err := getTimedIMUSensorReading(ctx, config) - if err != nil { - return status - } - - // parse imu reading - sr := cartofacade.IMUReading{ - LinearAcceleration: tsr.LinearAcceleration, - AngularVelocity: tsr.AngularVelocity, - } - - // sleep remainder of time interval - timeToSleep := tryAddIMUReading(ctx, sr, tsr.ReadingTime, *config) - time.Sleep(time.Duration(timeToSleep) * time.Millisecond) - } else { - /* - In order for cartographer to build a correct map, the lidar and imu readings need to be processed in - order in offline mode. We only add the stored IMU data if the next lidar data has a timestamp after - the current IMU reading's timestamp. - */ - if config.nextIMUData.time.Sub(config.nextLidarData.time).Milliseconds() < 0 { - if config.nextIMUData.data != undefinedIMU && config.firstLidarReadingTime != defaultTime && - config.nextIMUData.time.Sub(config.firstLidarReadingTime) > 0 { - tryAddIMUReadingUntilSuccess(ctx, config.nextIMUData.data, config.nextIMUData.time, *config) - } - // get next imu data response - tsr, status, err := getTimedIMUSensorReading(ctx, config) - if err != nil { - return status - } - - // parse imu reading - sr := cartofacade.IMUReading{ - LinearAcceleration: tsr.LinearAcceleration, - AngularVelocity: tsr.AngularVelocity, - } - - // TODO: Remove dropping out of order imu readings after DATA-1812 has been complete - // JIRA Ticket: https://viam.atlassian.net/browse/DATA-1812 - // update current imu data and time - if config.nextIMUData.time.Sub(tsr.ReadingTime).Milliseconds() < 0 { - config.nextIMUData.time = tsr.ReadingTime - config.nextIMUData.data = sr - } else { - config.Logger.Debugf("%v \t | IMU | Dropping data \t \t | %v \n", tsr.ReadingTime, tsr.ReadingTime.Unix()) - } - } else { - time.Sleep(time.Millisecond) - } - } - return false -} - -// tryAddIMUReadingUntilSuccess adds a reading to the cartofacade and retries on error (offline mode). -func tryAddIMUReadingUntilSuccess(ctx context.Context, reading cartofacade.IMUReading, readingTime time.Time, config Config) { - /* - while add sensor reading fails, keep trying to add the same reading - in offline mode - we want to process each reading so if we cannot acquire the lock we should try again - */ - for { - select { - case <-ctx.Done(): - return - default: - err := config.CartoFacade.AddIMUReading(ctx, config.Timeout, config.IMUName, reading, readingTime) - if err == nil { - config.Logger.Debugf("%v \t | IMU | Success \t \t | %v \n", readingTime, readingTime.Unix()) - return - } - if !errors.Is(err, cartofacade.ErrUnableToAcquireLock) { - config.Logger.Warnw("Retrying sensor reading due to error from cartofacade", "error", err) - } - config.Logger.Debugf("%v \t | IMU | Failure \t \t | %v \n", readingTime, readingTime.Unix()) - } - } -} - -// tryAddIMUReading adds a reading to the carto facade and does not retry (online). -func tryAddIMUReading(ctx context.Context, reading cartofacade.IMUReading, readingTime time.Time, config Config) int { - startTime := time.Now() - err := config.CartoFacade.AddIMUReading(ctx, config.Timeout, config.IMUName, reading, readingTime) - if err != nil { - if errors.Is(err, cartofacade.ErrUnableToAcquireLock) { - config.Logger.Debugw("Skipping sensor reading due to lock contention in cartofacade", "error", err) - } else { - config.Logger.Warnw("Skipping sensor reading due to error from cartofacade", "error", err) - } - } - timeElapsedMs := int(time.Since(startTime).Milliseconds()) - return int(math.Max(0, float64(config.IMUDataRateMsec-timeElapsedMs))) +// LidarData stores the next data to be added to cartographer along with its associated timestamp so that, +// in offline mode, data from multiple sensors can be added in order. +type LidarData struct { + time time.Time + data []byte } -// getTimedIMUSensorReading returns the next imu reading if available along with a status denoting if the end of dataset has been reached. -func getTimedIMUSensorReading(ctx context.Context, config *Config) (sensors.TimedIMUSensorReadingResponse, bool, error) { - tsr, err := config.IMU.TimedIMUSensorReading(ctx) - if err != nil { - config.Logger.Warn(err) - // only end the sensor process if we are in offline mode - if config.LidarDataRateMsec == 0 { - return tsr, strings.Contains(err.Error(), replaymovementsensor.ErrEndOfDataset.Error()), err - } - return tsr, false, err - } - return tsr, false, err +// Update currentIMUData under a mutex lock. +func (config *Config) updateMutexProtectedIMUData(time time.Time, data cartofacade.IMUReading) { + config.Mutex.Lock() + config.currentIMUData.time = time + config.currentIMUData.data = data + config.Mutex.Unlock() } -// getTimedLidarSensorReading returns the next lidar reading if available along with a status denoting if the end of dataset has been -// reached. -func getTimedLidarSensorReading(ctx context.Context, config *Config) (sensors.TimedLidarSensorReadingResponse, bool, error) { - tsr, err := config.Lidar.TimedLidarSensorReading(ctx) - if err != nil { - config.Logger.Warn(err) - // only end the sensor process if we are in offline mode - if config.LidarDataRateMsec == 0 { - return tsr, strings.Contains(err.Error(), replaypcd.ErrEndOfDataset.Error()), err - } - return tsr, false, err - } - return tsr, false, err +// Update currentLidarData under a mutex lock. +func (config *Config) updateMutexProtectedLidarData(time time.Time, data []byte) { + config.Mutex.Lock() + config.currentLidarData.time = time + config.currentLidarData.data = data + config.Mutex.Unlock() } diff --git a/sensorprocess/sensorprocess_test.go b/sensorprocess/sensorprocess_test.go index b1f1afbf..37afc11d 100644 --- a/sensorprocess/sensorprocess_test.go +++ b/sensorprocess/sensorprocess_test.go @@ -3,6 +3,7 @@ package sensorprocess import ( "context" "errors" + "sync" "testing" "time" @@ -76,7 +77,7 @@ func TestAddLidarReadingOffline(t *testing.T) { ) error { return nil } - tryAddLidarReadingUntilSuccess(context.Background(), reading, readingTimestamp, config) + config.tryAddLidarReadingUntilSuccess(context.Background(), reading, readingTimestamp) }) t.Run("failure with UNABLE_TO_ACQUIRE_LOCK error and cancelled context, no infinite loop", func(t *testing.T) { @@ -92,7 +93,7 @@ func TestAddLidarReadingOffline(t *testing.T) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp) }) t.Run("failure with a different error and cancelled context, no infinite loop", func(t *testing.T) { @@ -108,7 +109,7 @@ func TestAddLidarReadingOffline(t *testing.T) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp) }) t.Run("failure with errors being hit a few times, a retry, and then success", func(t *testing.T) { @@ -138,7 +139,7 @@ func TestAddLidarReadingOffline(t *testing.T) { } return nil } - tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddLidarReadingUntilSuccess(cancelCtx, reading, readingTimestamp) test.That(t, len(calls), test.ShouldEqual, 4) for i, args := range calls { t.Logf("addLidarReadingArgsHistory %d", i) @@ -176,7 +177,7 @@ func TestAddIMUReadingOffline(t *testing.T) { ) error { return nil } - tryAddIMUReadingUntilSuccess(context.Background(), reading, readingTimestamp, config) + config.tryAddIMUReadingUntilSuccess(context.Background(), reading, readingTimestamp) }) t.Run("failure with UNABLE_TO_ACQUIRE_LOCK error and cancelled context, no infinite loop", func(t *testing.T) { @@ -192,7 +193,7 @@ func TestAddIMUReadingOffline(t *testing.T) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp) }) t.Run("failure with a different error and cancelled context, no infinite loop", func(t *testing.T) { @@ -208,7 +209,7 @@ func TestAddIMUReadingOffline(t *testing.T) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp) }) t.Run("failure with errors being hit a few times, a retry, and then success", func(t *testing.T) { @@ -238,7 +239,7 @@ func TestAddIMUReadingOffline(t *testing.T) { } return nil } - tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp, config) + config.tryAddIMUReadingUntilSuccess(cancelCtx, reading, readingTimestamp) test.That(t, len(calls), test.ShouldEqual, 4) for i, args := range calls { t.Logf("addIMUReadingArgsHistory %d", i) @@ -277,7 +278,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return nil } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -293,7 +294,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return cartofacade.ErrUnableToAcquireLock } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -310,7 +311,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return errUnknown } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -325,7 +326,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return nil } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.LidarDataRateMsec) }) @@ -342,7 +343,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return cartofacade.ErrUnableToAcquireLock } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.LidarDataRateMsec) }) @@ -359,7 +360,7 @@ func TestAddLidarReadingOnline(t *testing.T) { return errUnknown } - timeToSleep := tryAddLidarReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddLidarReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.LidarDataRateMsec) }) @@ -394,7 +395,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return nil } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -410,7 +411,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return cartofacade.ErrUnableToAcquireLock } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -426,7 +427,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return errUnknown } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldEqual, 0) }) @@ -441,7 +442,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return nil } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.IMUDataRateMsec) }) @@ -457,7 +458,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return cartofacade.ErrUnableToAcquireLock } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.IMUDataRateMsec) }) @@ -474,7 +475,7 @@ func TestAddIMUReadingOnline(t *testing.T) { return errUnknown } - timeToSleep := tryAddIMUReading(context.Background(), reading, readingTimestamp, config) + timeToSleep := config.tryAddIMUReading(context.Background(), reading, readingTimestamp) test.That(t, timeToSleep, test.ShouldBeGreaterThan, 0) test.That(t, timeToSleep, test.ShouldBeLessThanOrEqualTo, config.IMUDataRateMsec) }) @@ -545,7 +546,7 @@ func onlineModeLidarTestHelper( test.That(t, calls[0].readingTimestamp.Before(calls[1].readingTimestamp), test.ShouldBeTrue) test.That(t, calls[1].readingTimestamp.Before(calls[2].readingTimestamp), test.ShouldBeTrue) } else if cam == "replay_lidar" { - readingTime, err := time.Parse(time.RFC3339Nano, s.TestTime) + readingTime, err := time.Parse(time.RFC3339Nano, s.TestTimestamp) test.That(t, err, test.ShouldBeNil) test.That(t, calls[0].readingTimestamp.Equal(readingTime), test.ShouldBeTrue) } else { @@ -595,6 +596,8 @@ func onlineModeIMUTestHelper( // set lidar data rate to signify that we are in online mode config.LidarDataRateMsec = 10 + config.currentLidarData.time = time.Now().UTC().Add(-10 * time.Second) + config.sensorProcessStartTime = time.Time{}.Add(time.Millisecond) jobDone := config.addIMUReading(ctx) test.That(t, len(calls), test.ShouldEqual, 1) @@ -621,7 +624,7 @@ func onlineModeIMUTestHelper( test.That(t, calls[0].readingTimestamp.Before(calls[1].readingTimestamp), test.ShouldBeTrue) test.That(t, calls[1].readingTimestamp.Before(calls[2].readingTimestamp), test.ShouldBeTrue) } else if movementSensor == "replay_imu" { - readingTime, err := time.Parse(time.RFC3339Nano, s.TestTime) + readingTime, err := time.Parse(time.RFC3339Nano, s.TestTimestamp) test.That(t, err, test.ShouldBeNil) test.That(t, calls[0].readingTimestamp.Equal(readingTime), test.ShouldBeTrue) } else { @@ -721,6 +724,7 @@ func TestAddLidarReading(t *testing.T) { LidarDataRateMsec: 200, Timeout: 10 * time.Second, RunFinalOptimizationFunc: runFinalOptimizationFunc, + Mutex: &sync.Mutex{}, } ctx := context.Background() @@ -821,14 +825,7 @@ func TestAddLidarReading(t *testing.T) { runFinalOptimizationFunc = func(context.Context, time.Duration) error { return errors.New("test error") } - - config = Config{ - Logger: logger, - CartoFacade: &cf, - LidarDataRateMsec: 200, - Timeout: 10 * time.Second, - RunFinalOptimizationFunc: runFinalOptimizationFunc, - } + config.RunFinalOptimizationFunc = runFinalOptimizationFunc cam := "finished_replay_lidar" logger := golog.NewTestLogger(t) @@ -852,6 +849,7 @@ func TestAddIMUReading(t *testing.T) { CartoFacade: &cf, IMUDataRateMsec: 50, Timeout: 10 * time.Second, + Mutex: &sync.Mutex{}, } ctx := context.Background() @@ -868,7 +866,6 @@ func TestAddIMUReading(t *testing.T) { ) }) - // TODO: once test replay_imus exist https://viam.atlassian.net/browse/RSDK-4556 t.Run("returns error in online mode when replay sensor timestamp is invalid, doesn't try to add sensor data", func(t *testing.T) { movementsensor := "invalid_replay_imu" invalidIMUTestHelper( @@ -915,8 +912,8 @@ func TestAddIMUReading(t *testing.T) { config.IMUName = replayIMU.Name config.IMUDataRateMsec = 0 config.LidarDataRateMsec = 0 - config.nextLidarData.time = time.Now() - config.firstLidarReadingTime = time.Time{}.Add(time.Millisecond) + config.currentLidarData.time = time.Now().UTC().Add(-10 * time.Second) + config.sensorProcessStartTime = time.Time{}.Add(time.Millisecond) _ = config.addIMUReading(ctx) // first call gets data jobDone := config.addIMUReading(ctx) @@ -949,7 +946,7 @@ func TestAddIMUReading(t *testing.T) { config.IMU = replayIMU config.IMUDataRateMsec = 0 - config.nextLidarData.time = time.Now() + config.currentLidarData.time = time.Now().UTC().Add(-10 * time.Second) jobDone := config.addIMUReading(ctx) test.That(t, jobDone, test.ShouldBeTrue) @@ -966,6 +963,7 @@ func TestStartLidar(t *testing.T) { LidarDataRateMsec: 200, Timeout: 10 * time.Second, RunFinalOptimizationFunc: cf.RunFinalOptimization, + Mutex: &sync.Mutex{}, } cancelCtx, cancelFunc := context.WithCancel(context.Background()) @@ -1007,6 +1005,7 @@ func TestStartIMU(t *testing.T) { CartoFacade: &cf, IMUDataRateMsec: 50, Timeout: 10 * time.Second, + Mutex: &sync.Mutex{}, } cancelCtx, cancelFunc := context.WithCancel(context.Background()) @@ -1018,7 +1017,7 @@ func TestStartIMU(t *testing.T) { config.IMU = replaySensor config.IMUDataRateMsec = 0 - config.nextLidarData.time = time.Now() + config.currentLidarData.time = time.Now().UTC().Add(-10 * time.Second) jobDone := config.StartIMU(context.Background()) test.That(t, jobDone, test.ShouldBeTrue) @@ -1032,7 +1031,7 @@ func TestStartIMU(t *testing.T) { config.IMU = replaySensor config.IMUDataRateMsec = 0 - config.nextLidarData.time = time.Now() + config.currentLidarData.time = time.Now().UTC().Add(-10 * time.Second) cancelFunc() diff --git a/sensors/sensors_test.go b/sensors/sensors_test.go index 47b09395..9f88da45 100644 --- a/sensors/sensors_test.go +++ b/sensors/sensors_test.go @@ -187,6 +187,8 @@ func TestTimedLidarSensorReading(t *testing.T) { t.Run("when a live lidar succeeds, returns current time in UTC and the reading", func(t *testing.T) { beforeReading := time.Now().UTC() + time.Sleep(10 * time.Millisecond) + tsr, err := goodLidar.TimedLidarSensorReading(ctx) test.That(t, err, test.ShouldBeNil) test.That(t, tsr.Reading, test.ShouldNotBeNil) @@ -205,7 +207,11 @@ func TestTimedLidarSensorReading(t *testing.T) { tsr, err := goodReplayLidar.TimedLidarSensorReading(ctx) test.That(t, err, test.ShouldBeNil) test.That(t, tsr.Reading, test.ShouldNotBeNil) - test.That(t, tsr.ReadingTime, test.ShouldEqual, time.Date(2006, 1, 2, 15, 4, 5, 999900000, time.UTC)) + + readingTime, err := time.Parse(time.RFC3339Nano, s.TestTimestamp) + test.That(t, err, test.ShouldBeNil) + test.That(t, tsr.ReadingTime.Equal(readingTime), test.ShouldBeTrue) + test.That(t, tsr.Replay, test.ShouldBeTrue) }) } diff --git a/sensors/test_deps.go b/sensors/test_deps.go index c1947b4e..cb45277a 100644 --- a/sensors/test_deps.go +++ b/sensors/test_deps.go @@ -2,6 +2,7 @@ package sensors import ( "context" + "time" "github.com/golang/geo/r3" "github.com/pkg/errors" @@ -18,14 +19,12 @@ import ( "go.viam.com/rdk/utils/contextutils" ) -const ( - // TestTime can be used to test specific timestamps provided by a replay sensor. - TestTime = "2006-01-02T15:04:05.9999Z" - // BadTime can be used to represent something that should cause an error while parsing it as a time. - BadTime = "NOT A TIME" -) +// BadTime can be used to represent something that should cause an error while parsing it as a time. +const BadTime = "NOT A TIME" var ( + // TestTimestamp can be used to test specific timestamps provided by a replay sensor. + TestTimestamp = time.Now().UTC().Format("2006-01-02T15:04:05.999999Z") // LinAcc is the successful mock linear acceleration result used for testing. LinAcc = r3.Vector{X: 1, Y: 1, Z: 1} // AngVel is the successful mock angular velocity result used for testing. @@ -41,7 +40,7 @@ func SetupDeps(lidarName, imuName string) resource.Dependencies { case "warming_up_lidar": deps[camera.Named(lidarName)] = getWarmingUpLidar() case "replay_lidar": - deps[camera.Named(lidarName)] = getReplayLidar(TestTime) + deps[camera.Named(lidarName)] = getReplayLidar(TestTimestamp) case "invalid_replay_lidar": deps[camera.Named(lidarName)] = getReplayLidar(BadTime) case "lidar_with_erroring_functions": @@ -54,12 +53,11 @@ func SetupDeps(lidarName, imuName string) resource.Dependencies { deps[camera.Named(lidarName)] = getFinishedReplayLidar() } - // TODO: create setup deps for various replay_imu, see https://viam.atlassian.net/browse/RSDK-4556 switch imuName { case "good_imu": deps[movementsensor.Named(imuName)] = getGoodIMU() case "replay_imu": - deps[movementsensor.Named(imuName)] = getReplayIMU(TestTime) + deps[movementsensor.Named(imuName)] = getReplayIMU(TestTimestamp) case "invalid_replay_imu": deps[movementsensor.Named(imuName)] = getReplayIMU(BadTime) case "imu_with_erroring_functions": diff --git a/testhelper/integrationtesthelper.go b/testhelper/integrationtesthelper.go new file mode 100644 index 00000000..b8315423 --- /dev/null +++ b/testhelper/integrationtesthelper.go @@ -0,0 +1,331 @@ +// Package testhelper provides test helpers which don't depend on viamcartographer +package testhelper + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os" + "path" + "regexp" + "strconv" + "sync" + "testing" + "time" + + "github.com/golang/geo/r3" + "github.com/pkg/errors" + replaylidar "go.viam.com/rdk/components/camera/replaypcd" + replaymovementsensor "go.viam.com/rdk/components/movementsensor/replay" + "go.viam.com/rdk/pointcloud" + "go.viam.com/rdk/spatialmath" + "go.viam.com/utils/artifact" + + sensors "github.com/viamrobotics/viam-cartographer/sensors" +) + +const ( + // NumPointClouds is the amount of mock lidar data saved in the mock_data/lidar slam artifact + // directory used for integration tests. + NumPointClouds = 10 + // NumIMUData is the amount of mock IMU data saved in the mock_data/imu/data.txt slam artifact + // file used for integration tests. + NumIMUData = 40 + // Path to slam mock data used for integration tests artifact path. + // artifact.MustPath("viam-cartographer/mock_lidar"). + mockDataPath = "viam-cartographer/mock_data" + sensorDataIngestionWaitTime = 50 * time.Millisecond +) + +var defaultTime = time.Time{} + +// TimeTracker stores the current and next timestamps for both IMU and lidar. These are used to manually +// set the timestamp of each set of data being sent to cartographer and ensure proper ordering between them. +// This allows for consistent testing. +type TimeTracker struct { + LidarTime time.Time + NextLidarTime time.Time + + ImuTime time.Time + NextImuTime time.Time + + LastLidarTime time.Time + LastImuTime time.Time + + Mu *sync.Mutex +} + +// IntegrationTimedLidarSensor returns a mock timed lidar sensor +// or an error if preconditions to build the mock are not met. +// It validates that all required mock lidar reading files are able to be found. +// When the mock is called, it returns the next mock lidar reading, with the +// ReadingTime incremented by the sensorReadingInterval. +// The Replay sensor field of the mock readings will match the replay parameter. +// When the end of the mock lidar readings is reached, the done channel +// is written to once so the caller can detect all lidar readings have been emitted +// from the mock. This is intended to match the same "end of dataset" behavior of a +// replay sensor. +// It is important to provide deterministic time information to cartographer to +// ensure test outputs of cartographer are deterministic. +func IntegrationTimedLidarSensor( + t *testing.T, + lidar string, + replay bool, + sensorReadingInterval time.Duration, + done chan struct{}, + timeTracker *TimeTracker, +) (sensors.TimedLidarSensor, error) { + // Check required amount of lidar data is present + err := mockLidarReadingsValid() + if err != nil { + return nil, err + } + + var i uint64 + + closed := false + ts := &sensors.TimedLidarSensorMock{} + ts.TimedLidarSensorReadingFunc = func(ctx context.Context) (sensors.TimedLidarSensorReadingResponse, error) { + defer timeTracker.Mu.Unlock() + /* + Holds the process until for all necessary IMU data has been sent to cartographer. Only applicable + when the IMU is present (timeTracker.NextImuTime has been defined) and is always true in the first iteration. + This and the manual definition of timestamps allow for consistent results. + */ + for { + timeTracker.Mu.Lock() + if timeTracker.ImuTime == defaultTime { + time.Sleep(sensorDataIngestionWaitTime) + break + } + + if i <= 1 || timeTracker.LidarTime.Sub(timeTracker.ImuTime) <= 0 { + time.Sleep(sensorDataIngestionWaitTime) + break + } + timeTracker.Mu.Unlock() + } + + // Communicate that all lidar readings have been sent to cartographer or if the last IMU reading has been sent, checks + // if LastLidarTime has been defined. If so, simulate endOfDataSet error. + t.Logf("TimedLidarSensorReading Mock i: %d, closed: %v, readingTime: %s\n", i, closed, timeTracker.LidarTime.String()) + if i >= NumPointClouds || timeTracker.LastImuTime != defaultTime { + // Sends a signal to the integration sensor's done channel the first time end of dataset has been sent + if !closed { + done <- struct{}{} + closed = true + timeTracker.LastLidarTime = timeTracker.LidarTime + } + + return sensors.TimedLidarSensorReadingResponse{}, replaylidar.ErrEndOfDataset + } + + // Get next lidar data + resp, err := createTimedLidarSensorReadingResponse(t, i, replay, timeTracker) + if err != nil { + return resp, err + } + + // Advance the data index and update time tracker (manual timestamps occurs here) + i++ + timeTracker.LidarTime = timeTracker.LidarTime.Add(sensorReadingInterval) + timeTracker.NextLidarTime = timeTracker.LidarTime.Add(sensorReadingInterval) + + return resp, nil + } + + return ts, nil +} + +// IntegrationTimedIMUSensor returns a mock timed IMU sensor. +// When the mock is called, it returns the next mock IMU readings, with the +// ReadingTime incremented by the sensorReadingInterval. +// The Replay sensor field of the mock readings will match the replay parameter. +// When the end of the mock IMU readings is reached, the done channel +// is written to once so the caller can detect all IMU readings have been emitted +// from the mock. This is intended to match the same "end of dataset" behavior of a +// replay sensor. +// It is important to provide deterministic time information to cartographer to +// ensure test outputs of cartographer are deterministic. +// Note that IMU replay sensors are not yet fully supported. +func IntegrationTimedIMUSensor( + t *testing.T, + imu string, + replay bool, + sensorReadingInterval time.Duration, + done chan struct{}, + timeTracker *TimeTracker, +) (sensors.TimedIMUSensor, error) { + // Return nil if IMU is not requested + if imu == "" { + return nil, nil + } + + // Check required amount of IMU data is present and creates mock dataset from provided mock data artifact file. + mockDataset, err := mockIMUReadingsValid(t) + if err != nil { + return nil, err + } + + var i uint64 + closed := false + ts := &sensors.TimedIMUSensorMock{} + ts.TimedIMUSensorReadingFunc = func(ctx context.Context) (sensors.TimedIMUSensorReadingResponse, error) { + defer timeTracker.Mu.Unlock() + /* + Holds the process until for all necessary lidar data has been sent to cartographer. Is always + true in the first iteration. This and the manual definition of timestamps allow for consistent + results. + */ + for { + timeTracker.Mu.Lock() + if i == 0 || timeTracker.ImuTime.Sub(timeTracker.NextLidarTime) < 0 { + time.Sleep(sensorDataIngestionWaitTime) + break + } + timeTracker.Mu.Unlock() + } + + // Communicate that all desired IMU readings have been sent or to cartographer or if the last lidar reading has been sent + // by, checks if LastLidarTime has been defined. If so, simulate endOfDataSet error. + t.Logf("TimedIMUSensorReading Mock i: %d, closed: %v, readingTime: %s\n", i, closed, timeTracker.ImuTime.String()) + if int(i) >= len(mockDataset) || timeTracker.LastLidarTime != defaultTime { + // Sends a signal to the integration sensor's done channel the first time end of dataset has been sent + if !closed { + done <- struct{}{} + closed = true + timeTracker.LastImuTime = timeTracker.ImuTime + } + return sensors.TimedIMUSensorReadingResponse{}, replaymovementsensor.ErrEndOfDataset + } + + // Get next IMU data + resp, err := createTimedIMUSensorReadingResponse(t, mockDataset[i], replay, timeTracker) + if err != nil { + return resp, err + } + + // Advance the data index and update time tracker (manual timestamps occurs here) + i++ + timeTracker.ImuTime = timeTracker.ImuTime.Add(sensorReadingInterval) + timeTracker.NextImuTime = timeTracker.ImuTime.Add(sensorReadingInterval) + + return resp, nil + } + + return ts, nil +} + +func createTimedLidarSensorReadingResponse(t *testing.T, i uint64, replay bool, timeTracker *TimeTracker, +) (sensors.TimedLidarSensorReadingResponse, error) { + file, err := os.Open(artifact.MustPath(mockDataPath + "/lidar/" + strconv.FormatUint(i, 10) + ".pcd")) + if err != nil { + t.Error("TEST FAILED TimedLidarSensorReading Mock failed to open pcd file") + return sensors.TimedLidarSensorReadingResponse{}, err + } + readingPc, err := pointcloud.ReadPCD(file) + if err != nil { + t.Error("TEST FAILED TimedLidarSensorReading Mock failed to read pcd") + return sensors.TimedLidarSensorReadingResponse{}, err + } + + buf := new(bytes.Buffer) + err = pointcloud.ToPCD(readingPc, buf, pointcloud.PCDBinary) + if err != nil { + t.Error("TEST FAILED TimedLidarSensorReading Mock failed to parse pcd") + return sensors.TimedLidarSensorReadingResponse{}, err + } + + resp := sensors.TimedLidarSensorReadingResponse{ + Reading: buf.Bytes(), + ReadingTime: timeTracker.LidarTime, + Replay: replay, + } + return resp, nil +} + +func createTimedIMUSensorReadingResponse(t *testing.T, line string, replay bool, timeTracker *TimeTracker, +) (sensors.TimedIMUSensorReadingResponse, error) { + re := regexp.MustCompile(`[-+]?\d*\.?\d+`) + matches := re.FindAllString(line, -1) + + linAccX, err1 := strconv.ParseFloat(matches[0], 64) + linAccY, err2 := strconv.ParseFloat(matches[1], 64) + linAccZ, err3 := strconv.ParseFloat(matches[2], 64) + if err1 != nil || err2 != nil || err3 != nil { + t.Error("TEST FAILED TimedIMUSensorReading Mock failed to parse linear acceleration") + return sensors.TimedIMUSensorReadingResponse{}, errors.New("error parsing linear acceleration from file") + } + linAcc := r3.Vector{X: linAccX, Y: linAccY, Z: linAccZ} + + angVelX, err1 := strconv.ParseFloat(matches[3], 64) + angVelY, err2 := strconv.ParseFloat(matches[4], 64) + angVelZ, err3 := strconv.ParseFloat(matches[5], 64) + if err1 != nil || err2 != nil || err3 != nil { + t.Error("TEST FAILED TimedIMUSensorReading Mock failed to parse angular velocity") + return sensors.TimedIMUSensorReadingResponse{}, errors.New("error parsing angular velocity from file") + } + angVel := spatialmath.AngularVelocity{X: angVelX, Y: angVelY, Z: angVelZ} + + resp := sensors.TimedIMUSensorReadingResponse{ + LinearAcceleration: linAcc, + AngularVelocity: angVel, + ReadingTime: timeTracker.ImuTime, + Replay: replay, + } + return resp, nil +} + +func mockLidarReadingsValid() error { + dirEntries, err := os.ReadDir(artifact.MustPath(mockDataPath + "/lidar")) + if err != nil { + return err + } + + var files []string + for _, f := range dirEntries { + if !f.IsDir() { + files = append(files, f.Name()) + } + } + if len(files) < NumPointClouds { + return errors.Errorf("expected at least %v lidar readings for integration test", NumPointClouds) + } + for i := 0; i < NumPointClouds; i++ { + found := false + expectedFile := fmt.Sprintf("%d.pcd", i) + for _, file := range files { + if file == expectedFile { + found = true + break + } + } + + if !found { + return errors.Errorf("expected %s to exist for integration test", path.Join(mockDataPath+"/lidar", expectedFile)) + } + } + return nil +} + +func mockIMUReadingsValid(t *testing.T) ([]string, error) { + file, err := os.Open(artifact.MustPath(mockDataPath + "/imu/data.txt")) + if err != nil { + t.Error("TEST FAILED TimedIMUSensorReading Mock failed to open data file") + return []string{}, err + } + + mockDatasetScanner := bufio.NewScanner(file) + mockDatasetScanner.Split(bufio.ScanLines) + var mockDataset []string + + for mockDatasetScanner.Scan() { + mockDataset = append(mockDataset, mockDatasetScanner.Text()) + } + + if len(mockDataset) < NumIMUData { + return []string{}, errors.Errorf("expected at least %v imu readings for integration test", NumIMUData) + } + return mockDataset, nil +} diff --git a/testhelper/testhelper.go b/testhelper/testhelper.go index e5902351..063e28b0 100644 --- a/testhelper/testhelper.go +++ b/testhelper/testhelper.go @@ -2,13 +2,9 @@ package testhelper import ( - "bytes" "context" - "fmt" "os" - "path" "path/filepath" - "strconv" "strings" "testing" "time" @@ -44,19 +40,12 @@ const ( // function for the while loop that attempts to grab data from the // sensor that is used in the GetAndSaveData function. SensorValidationIntervalSecForTest = 1 - testDialMaxTimeoutSec = 1 - // NumPointClouds is the number of pointclouds saved in artifact - // for the cartographer integration tests. - NumPointClouds = 15 // CartoFacadeTimeoutForTest is the timeout used for capi requests for tests. CartoFacadeTimeoutForTest = 5 * time.Second - // CartoFacadeInternalTimeoutForTest is the timeout used for internal capi - // requests for tests. + // CartoFacadeInternalTimeoutForTest is the timeout used for internal capi requests for tests. CartoFacadeInternalTimeoutForTest = 15 * time.Minute ) -var mockLidarPath = artifact.MustPath("viam-cartographer/mock_lidar") - // SetupStubDeps returns stubbed dependencies based on the camera // the stubs fail tests if called. func SetupStubDeps(cameraName, movementSensorName string, t *testing.T) resource.Dependencies { @@ -119,175 +108,6 @@ func getStubIMU(t *testing.T) *inject.MovementSensor { return imu } -func mockLidarReadingsValid() error { - dirEntries, err := os.ReadDir(mockLidarPath) - if err != nil { - return err - } - - var files []string - for _, f := range dirEntries { - if !f.IsDir() { - files = append(files, f.Name()) - } - } - if len(files) < NumPointClouds { - return errors.New("expected at least 15 lidar readings for integration test") - } - for i := 0; i < NumPointClouds; i++ { - found := false - expectedFile := fmt.Sprintf("%d.pcd", i) - for _, file := range files { - if file == expectedFile { - found = true - break - } - } - - if !found { - return errors.Errorf("expected %s to exist for integration test", path.Join(mockLidarPath, expectedFile)) - } - } - return nil -} - -// IntegrationTimedLidarSensor returns a mock timed lidar sensor -// or an error if preconditions to build the mock are not met. -// It validates that all required mock lidar reading files are able to be found. -// When the mock is called, it returns the next mock lidar reading, with the -// ReadingTime incremented by the sensorReadingInterval. -// The Replay sensor field of the mock readings will match the replay parameter. -// When the end of the mock lidar readings is reached, the done channel -// is written to once so the caller can detect all lidar readings have been emitted -// from the mock. This is intended to match the same "end of dataset" behavior of a -// replay sensor. -// It is important to provide deterministic time information to cartographer to -// ensure test outputs of cartographer are deterministic. -func IntegrationTimedLidarSensor( - t *testing.T, - lidar string, - replay bool, - sensorReadingInterval time.Duration, - done chan struct{}, -) (s.TimedLidarSensor, error) { - err := mockLidarReadingsValid() - if err != nil { - return nil, err - } - - var i uint64 - closed := false - - ts := &s.TimedLidarSensorMock{} - readingTime := time.Date(2021, 8, 15, 14, 30, 45, 100, time.UTC) - - ts.TimedLidarSensorReadingFunc = func(ctx context.Context) (s.TimedLidarSensorReadingResponse, error) { - readingTime = readingTime.Add(sensorReadingInterval) - t.Logf("TimedLidarSensorReading Mock i: %d, closed: %v, readingTime: %s\n", i, closed, readingTime.String()) - if i >= NumPointClouds { - // communicate to the test that all lidar readings have been written - if !closed { - done <- struct{}{} - closed = true - } - return s.TimedLidarSensorReadingResponse{}, errors.New("end of dataset") - } - - file, err := os.Open(artifact.MustPath("viam-cartographer/mock_lidar/" + strconv.FormatUint(i, 10) + ".pcd")) - if err != nil { - t.Error("TEST FAILED TimedLidarSensorReading Mock failed to open pcd file") - return s.TimedLidarSensorReadingResponse{}, err - } - readingPc, err := pointcloud.ReadPCD(file) - if err != nil { - t.Error("TEST FAILED TimedLidarSensorReading Mock failed to read pcd") - return s.TimedLidarSensorReadingResponse{}, err - } - - buf := new(bytes.Buffer) - err = pointcloud.ToPCD(readingPc, buf, pointcloud.PCDBinary) - if err != nil { - t.Error("TEST FAILED TimedLidarSensorReading Mock failed to parse pcd") - return s.TimedLidarSensorReadingResponse{}, err - } - - i++ - return s.TimedLidarSensorReadingResponse{Reading: buf.Bytes(), ReadingTime: readingTime, Replay: replay}, nil - } - - return ts, nil -} - -// IntegrationTimedIMUSensor returns a mock timed IMU sensor. -// When the mock is called, it returns the next mock IMU readings, with the -// ReadingTime incremented by the sensorReadingInterval. -// The Replay sensor field of the mock readings will match the replay parameter. -// When the end of the mock IMU readings is reached, the done channel -// is written to once so the caller can detect all IMU readings have been emitted -// from the mock. This is intended to match the same "end of dataset" behavior of a -// replay sensor. -// It is important to provide deterministic time information to cartographer to -// ensure test outputs of cartographer are deterministic. -// Note that IMU replay sensors are not yet fully supported. -func IntegrationTimedIMUSensor( - t *testing.T, - imu string, - replay bool, - sensorReadingInterval time.Duration, - done chan struct{}, -) (s.TimedIMUSensor, error) { - if imu == "" { - return nil, nil - } - var i uint64 - closed := false - - ts := &s.TimedIMUSensorMock{} - mockLinearAccelerationData := []r3.Vector{ - {X: 1, Y: 1, Z: 1}, - {X: 2, Y: 1, Z: 1}, - {X: 1, Y: 2, Z: 1}, - {X: 1, Y: 1, Z: 2}, - {X: 1, Y: 1, Z: 3}, - {X: 1, Y: 3, Z: 1}, - {X: 3, Y: 1, Z: 1}, - {X: 2, Y: 1, Z: 1}, - } - mockAngularVelocityData := []spatialmath.AngularVelocity{ - {X: 1, Y: 2, Z: 1}, - {X: 1, Y: 2, Z: 1}, - {X: 1, Y: 2, Z: 1}, - {X: 1, Y: 1, Z: 2}, - {X: 1, Y: 1, Z: 2}, - {X: 1, Y: 1, Z: 2}, - {X: 5, Y: 1, Z: 1}, - {X: 5, Y: 1, Z: 1}, - } - - readingTime := time.Date(2021, 8, 15, 14, 30, 45, 100, time.UTC) - - ts.TimedIMUSensorReadingFunc = func(ctx context.Context) (s.TimedIMUSensorReadingResponse, error) { - readingTime = readingTime.Add(sensorReadingInterval) - t.Logf("TimedIMUSensorReading Mock i: %d, closed: %v, readingTime: %s\n", i, closed, readingTime.String()) - if int(i) >= len(mockLinearAccelerationData) { - // communicate to the test that all imu readings have been written - if !closed { - // This will be added back once synced mock lidar and IMU data is collected, - // see https://viam.atlassian.net/browse/RSDK-4495 - // done <- struct{}{} - closed = true - } - return s.TimedIMUSensorReadingResponse{}, errors.New("end of dataset") - } - linAcc := mockLinearAccelerationData[i] - angVel := mockAngularVelocityData[i] - i++ - return s.TimedIMUSensorReadingResponse{LinearAcceleration: linAcc, AngularVelocity: angVel, ReadingTime: readingTime, Replay: replay}, nil - } - - return ts, nil -} - // ClearDirectory deletes the contents in the path directory // without deleting path itself. func ClearDirectory(t *testing.T, path string) { diff --git a/viam-cartographer/src/carto_facade/carto_facade.cc b/viam-cartographer/src/carto_facade/carto_facade.cc index eb246815..7ecc7de0 100644 --- a/viam-cartographer/src/carto_facade/carto_facade.cc +++ b/viam-cartographer/src/carto_facade/carto_facade.cc @@ -864,8 +864,12 @@ void CartoFacade::AddIMUReading(const viam_carto_imu_reading *sr) { VLOG(1) << "AddSensorData timestamp: " << measurement.time << " Sensor type: IMU "; map_builder.AddSensorData(kIMUSensorId.id, measurement); - VLOG(1) << "Data added is: " << measurement.linear_acceleration - << " and " << measurement.angular_velocity; + VLOG(1) << "Data added is: (" << measurement.linear_acceleration[0] + << ", " << measurement.linear_acceleration[1] << ", " + << measurement.linear_acceleration[2] << ") and (" + << measurement.angular_velocity[0] << ", " + << measurement.angular_velocity[1] << ", " + << measurement.angular_velocity[2] << ")"; LOG(INFO) << "Added IMU data to Cartographer"; tmp_global_pose = map_builder.GetGlobalPose(); map_builder_mutex.unlock(); diff --git a/viam_cartographer.go b/viam_cartographer.go index 04572bf0..76831b43 100644 --- a/viam_cartographer.go +++ b/viam_cartographer.go @@ -45,8 +45,6 @@ const ( defaultDialMaxTimeoutSec = 30 defaultSensorValidationMaxTimeoutSec = 30 defaultSensorValidationIntervalSec = 1 - parsePortMaxTimeoutSec = 60 - localhost0 = "localhost:0" defaultCartoFacadeTimeout = 5 * time.Second defaultCartoFacadeInternalTimeout = 15 * time.Minute chunkSizeBytes = 1 * 1024 * 1024 @@ -135,6 +133,7 @@ func initSensorProcesses(cancelCtx context.Context, cartoSvc *CartographerServic InternalTimeout: cartoSvc.cartoFacadeInternalTimeout, Logger: cartoSvc.logger, RunFinalOptimizationFunc: cartoSvc.cartofacade.RunFinalOptimization, + Mutex: &sync.Mutex{}, } cartoSvc.sensorProcessWorkers.Add(1) @@ -699,6 +698,8 @@ func (cartoSvc *CartographerService) Close(ctx context.Context) error { return nil } + cartoSvc.logger.Info("Closing cartographer module") + defer cartoSvc.mu.Unlock() if cartoSvc.closed { cartoSvc.logger.Warn("Close() called multiple times") @@ -718,6 +719,8 @@ func (cartoSvc *CartographerService) Close(ctx context.Context) error { cartoSvc.cancelCartoFacadeFunc() cartoSvc.cartoFacadeWorkers.Wait() cartoSvc.closed = true + + cartoSvc.logger.Info("Closing complete") return nil }