diff --git a/viam-cartographer.go b/viam-cartographer.go index d5ca753b..8a1be92f 100644 --- a/viam-cartographer.go +++ b/viam-cartographer.go @@ -38,6 +38,8 @@ import ( var ( Model = resource.NewModel("viam", "slam", "cartographer") cartoLib cartofacade.CartoLib + // ErrClosed denotes that the slam service method was called on a closed slam resource. + ErrClosed = errors.Errorf("resource (%s) is closed", Model.String()) ) const ( @@ -436,6 +438,8 @@ func initCartoGrpcServer(ctx, cancelCtx context.Context, cartoSvc *cartographerS type cartographerService struct { resource.Named resource.AlwaysRebuild + mu sync.Mutex + closed bool primarySensorName string lidar lidar.Lidar executableName string @@ -496,6 +500,10 @@ type cartographerService struct { func (cartoSvc *cartographerService) GetPosition(ctx context.Context) (spatialmath.Pose, string, error) { ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetPosition") defer span.End() + if cartoSvc.closed { + cartoSvc.logger.Warn("GetPosition called after closed") + return nil, "", ErrClosed + } if cartoSvc.modularizationV2Enabled { return cartoSvc.getPositionModularizationV2(ctx) @@ -539,6 +547,11 @@ func (cartoSvc *cartographerService) GetPointCloudMap(ctx context.Context) (func ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetPointCloudMap") defer span.End() + if cartoSvc.closed { + cartoSvc.logger.Warn("GetPointCloudMap called after closed") + return nil, ErrClosed + } + if !cartoSvc.localizationMode { cartoSvc.mapTimestamp = time.Now().UTC() } @@ -551,6 +564,11 @@ func (cartoSvc *cartographerService) GetInternalState(ctx context.Context) (func ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetInternalState") defer span.End() + if cartoSvc.closed { + cartoSvc.logger.Warn("GetInternalState called after closed") + return nil, ErrClosed + } + return grpchelper.GetInternalStateCallback(ctx, cartoSvc.Name().ShortName(), cartoSvc.clientAlgo) } @@ -560,6 +578,11 @@ func (cartoSvc *cartographerService) GetLatestMapInfo(ctx context.Context) (time _, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetLatestMapInfo") defer span.End() + if cartoSvc.closed { + cartoSvc.logger.Warn("GetLatestMapInfo called after closed") + return time.Time{}, ErrClosed + } + return cartoSvc.mapTimestamp, nil } @@ -645,12 +668,21 @@ func (cartoSvc *cartographerService) getNextDataPoint(ctx context.Context, lidar } func (cartoSvc *cartographerService) DoCommand(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) { + if cartoSvc.closed { + cartoSvc.logger.Warn("DoCommand called after closed") + return nil, ErrClosed + } return nil, viamgrpc.UnimplementedError } // Close out of all slam related processes. func (cartoSvc *cartographerService) Close(ctx context.Context) error { - // TODO: Make this atomic & idempotent + cartoSvc.mu.Lock() + defer cartoSvc.mu.Unlock() + if cartoSvc.closed { + cartoSvc.logger.Warn("Close() called multiple times") + return nil + } if cartoSvc.modularizationV2Enabled { // stop sensor process workers cartoSvc.cancelSensorProcessFunc() @@ -665,6 +697,7 @@ func (cartoSvc *cartographerService) Close(ctx context.Context) error { // stop carto facade workers cartoSvc.cancelCartoFacadeFunc() cartoSvc.cartoFacadeWorkers.Wait() + cartoSvc.closed = true return nil } @@ -690,6 +723,7 @@ func (cartoSvc *cartographerService) Close(ctx context.Context) error { return errors.Wrap(err, "error occurred during closeout of process") } cartoSvc.activeBackgroundWorkers.Wait() + cartoSvc.closed = true return nil } diff --git a/viam-cartographer_test.go b/viam-cartographer_test.go index 11339e47..acecb92c 100644 --- a/viam-cartographer_test.go +++ b/viam-cartographer_test.go @@ -83,7 +83,6 @@ func TestNew(t *testing.T) { t.Run("Successful creation of cartographer slam service with no sensor", func(t *testing.T) { grpcServer, port := setupTestGRPCServer(t) - test.That(t, err, test.ShouldBeNil) attrCfg := &vcConfig.Config{ Sensors: []string{}, ConfigParams: map[string]string{"mode": "2d"}, @@ -101,7 +100,6 @@ func TestNew(t *testing.T) { t.Run("Failed creation of cartographer slam service with more than one sensor", func(t *testing.T) { grpcServer, port := setupTestGRPCServer(t) - test.That(t, err, test.ShouldBeNil) attrCfg := &vcConfig.Config{ Sensors: []string{"lidar", "one-too-many"}, ConfigParams: map[string]string{"mode": "2d"}, @@ -627,6 +625,102 @@ func TestSLAMProcess(t *testing.T) { internaltesthelper.ClearDirectory(t, dataDir) } +func TestClose(t *testing.T) { + logger := golog.NewTestLogger(t) + ctx := context.Background() + + t.Run("is idempotent and makes all endpoints return closed errors", func(t *testing.T) { + dataDir, err := internaltesthelper.CreateTempFolderArchitecture(logger) + test.That(t, err, test.ShouldBeNil) + + grpcServer, port := setupTestGRPCServer(t) + test.That(t, err, test.ShouldBeNil) + attrCfg := &vcConfig.Config{ + Sensors: []string{}, + ConfigParams: map[string]string{"mode": "2d"}, + DataDirectory: dataDir, + Port: "localhost:" + strconv.Itoa(port), + UseLiveData: &_false, + } + + svc, err := internaltesthelper.CreateSLAMService(t, attrCfg, logger, false, testExecutableName) + test.That(t, err, test.ShouldBeNil) + + grpcServer.Stop() + // call twice, assert result is the same to prove idempotence + test.That(t, svc.Close(context.Background()), test.ShouldBeNil) + test.That(t, svc.Close(context.Background()), test.ShouldBeNil) + + pose, componentRef, err := svc.GetPosition(ctx) + test.That(t, pose, test.ShouldBeNil) + test.That(t, componentRef, test.ShouldBeEmpty) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + gpcmF, err := svc.GetPointCloudMap(ctx) + test.That(t, gpcmF, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + gisF, err := svc.GetInternalState(ctx) + test.That(t, gisF, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + mapTime, err := svc.GetLatestMapInfo(ctx) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + test.That(t, mapTime, test.ShouldResemble, time.Time{}) + + cmd := map[string]interface{}{} + resp, err := svc.DoCommand(ctx, cmd) + test.That(t, resp, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + }) + + t.Run("is idempotent and makes all endpoints return closed errors when feature flag enabled", func(t *testing.T) { + termFunc := initTestCL(t, logger) + defer termFunc() + + dataDirectory, err := os.MkdirTemp("", "*") + test.That(t, err, test.ShouldBeNil) + + attrCfg := &vcConfig.Config{ + ModularizationV2Enabled: &_true, + Sensors: []string{"replay_sensor"}, + ConfigParams: map[string]string{"mode": "2d"}, + DataDirectory: dataDirectory, + UseLiveData: &_false, + MapRateSec: &testMapRateSec, + } + + svc, err := internaltesthelper.CreateSLAMService(t, attrCfg, logger, false, testExecutableName) + test.That(t, err, test.ShouldBeNil) + + // call twice, assert result is the same to prove idempotence + test.That(t, svc.Close(context.Background()), test.ShouldBeNil) + test.That(t, svc.Close(context.Background()), test.ShouldBeNil) + + pose, componentRef, err := svc.GetPosition(ctx) + test.That(t, pose, test.ShouldBeNil) + test.That(t, componentRef, test.ShouldBeEmpty) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + gpcmF, err := svc.GetPointCloudMap(ctx) + test.That(t, gpcmF, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + gisF, err := svc.GetInternalState(ctx) + test.That(t, gisF, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + + mapTime, err := svc.GetLatestMapInfo(ctx) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + test.That(t, mapTime, test.ShouldResemble, time.Time{}) + + cmd := map[string]interface{}{} + resp, err := svc.DoCommand(ctx, cmd) + test.That(t, resp, test.ShouldBeNil) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed) + }) +} + func TestDoCommand(t *testing.T) { logger := golog.NewTestLogger(t) dataDir, err := internaltesthelper.CreateTempFolderArchitecture(logger)