Skip to content

Commit

Permalink
[RSDK-2854] make-close-atomic-and-idempotent (#190)
Browse files Browse the repository at this point in the history
Co-authored-by: kim-mishra <121991867+kim-mishra@users.noreply.github.com>
  • Loading branch information
nicksanford and kim-mishra authored Jul 12, 2023
1 parent f1406a6 commit 8faded5
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 3 deletions.
36 changes: 35 additions & 1 deletion viam-cartographer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
var (
Model = resource.NewModel("viam", "slam", "cartographer")
cartoLib cartofacade.CartoLib
// ErrClosed denotes that the slam service method was called on a closed slam resource.
ErrClosed = errors.Errorf("resource (%s) is closed", Model.String())
)

const (
Expand Down Expand Up @@ -436,6 +438,8 @@ func initCartoGrpcServer(ctx, cancelCtx context.Context, cartoSvc *cartographerS
type cartographerService struct {
resource.Named
resource.AlwaysRebuild
mu sync.Mutex
closed bool
primarySensorName string
lidar lidar.Lidar
executableName string
Expand Down Expand Up @@ -496,6 +500,10 @@ type cartographerService struct {
func (cartoSvc *cartographerService) GetPosition(ctx context.Context) (spatialmath.Pose, string, error) {
ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetPosition")
defer span.End()
if cartoSvc.closed {
cartoSvc.logger.Warn("GetPosition called after closed")
return nil, "", ErrClosed
}

if cartoSvc.modularizationV2Enabled {
return cartoSvc.getPositionModularizationV2(ctx)
Expand Down Expand Up @@ -539,6 +547,11 @@ func (cartoSvc *cartographerService) GetPointCloudMap(ctx context.Context) (func
ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetPointCloudMap")
defer span.End()

if cartoSvc.closed {
cartoSvc.logger.Warn("GetPointCloudMap called after closed")
return nil, ErrClosed
}

if !cartoSvc.localizationMode {
cartoSvc.mapTimestamp = time.Now().UTC()
}
Expand All @@ -551,6 +564,11 @@ func (cartoSvc *cartographerService) GetInternalState(ctx context.Context) (func
ctx, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetInternalState")
defer span.End()

if cartoSvc.closed {
cartoSvc.logger.Warn("GetInternalState called after closed")
return nil, ErrClosed
}

return grpchelper.GetInternalStateCallback(ctx, cartoSvc.Name().ShortName(), cartoSvc.clientAlgo)
}

Expand All @@ -560,6 +578,11 @@ func (cartoSvc *cartographerService) GetLatestMapInfo(ctx context.Context) (time
_, span := trace.StartSpan(ctx, "viamcartographer::cartographerService::GetLatestMapInfo")
defer span.End()

if cartoSvc.closed {
cartoSvc.logger.Warn("GetLatestMapInfo called after closed")
return time.Time{}, ErrClosed
}

return cartoSvc.mapTimestamp, nil
}

Expand Down Expand Up @@ -645,12 +668,21 @@ func (cartoSvc *cartographerService) getNextDataPoint(ctx context.Context, lidar
}

func (cartoSvc *cartographerService) DoCommand(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) {
if cartoSvc.closed {
cartoSvc.logger.Warn("DoCommand called after closed")
return nil, ErrClosed
}
return nil, viamgrpc.UnimplementedError
}

// Close out of all slam related processes.
func (cartoSvc *cartographerService) Close(ctx context.Context) error {
// TODO: Make this atomic & idempotent
cartoSvc.mu.Lock()
defer cartoSvc.mu.Unlock()
if cartoSvc.closed {
cartoSvc.logger.Warn("Close() called multiple times")
return nil
}
if cartoSvc.modularizationV2Enabled {
// stop sensor process workers
cartoSvc.cancelSensorProcessFunc()
Expand All @@ -665,6 +697,7 @@ func (cartoSvc *cartographerService) Close(ctx context.Context) error {
// stop carto facade workers
cartoSvc.cancelCartoFacadeFunc()
cartoSvc.cartoFacadeWorkers.Wait()
cartoSvc.closed = true
return nil
}

Expand All @@ -690,6 +723,7 @@ func (cartoSvc *cartographerService) Close(ctx context.Context) error {
return errors.Wrap(err, "error occurred during closeout of process")
}
cartoSvc.activeBackgroundWorkers.Wait()
cartoSvc.closed = true
return nil
}

Expand Down
98 changes: 96 additions & 2 deletions viam-cartographer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestNew(t *testing.T) {

t.Run("Successful creation of cartographer slam service with no sensor", func(t *testing.T) {
grpcServer, port := setupTestGRPCServer(t)
test.That(t, err, test.ShouldBeNil)
attrCfg := &vcConfig.Config{
Sensors: []string{},
ConfigParams: map[string]string{"mode": "2d"},
Expand All @@ -101,7 +100,6 @@ func TestNew(t *testing.T) {

t.Run("Failed creation of cartographer slam service with more than one sensor", func(t *testing.T) {
grpcServer, port := setupTestGRPCServer(t)
test.That(t, err, test.ShouldBeNil)
attrCfg := &vcConfig.Config{
Sensors: []string{"lidar", "one-too-many"},
ConfigParams: map[string]string{"mode": "2d"},
Expand Down Expand Up @@ -627,6 +625,102 @@ func TestSLAMProcess(t *testing.T) {
internaltesthelper.ClearDirectory(t, dataDir)
}

func TestClose(t *testing.T) {
logger := golog.NewTestLogger(t)
ctx := context.Background()

t.Run("is idempotent and makes all endpoints return closed errors", func(t *testing.T) {
dataDir, err := internaltesthelper.CreateTempFolderArchitecture(logger)
test.That(t, err, test.ShouldBeNil)

grpcServer, port := setupTestGRPCServer(t)
test.That(t, err, test.ShouldBeNil)
attrCfg := &vcConfig.Config{
Sensors: []string{},
ConfigParams: map[string]string{"mode": "2d"},
DataDirectory: dataDir,
Port: "localhost:" + strconv.Itoa(port),
UseLiveData: &_false,
}

svc, err := internaltesthelper.CreateSLAMService(t, attrCfg, logger, false, testExecutableName)
test.That(t, err, test.ShouldBeNil)

grpcServer.Stop()
// call twice, assert result is the same to prove idempotence
test.That(t, svc.Close(context.Background()), test.ShouldBeNil)
test.That(t, svc.Close(context.Background()), test.ShouldBeNil)

pose, componentRef, err := svc.GetPosition(ctx)
test.That(t, pose, test.ShouldBeNil)
test.That(t, componentRef, test.ShouldBeEmpty)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

gpcmF, err := svc.GetPointCloudMap(ctx)
test.That(t, gpcmF, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

gisF, err := svc.GetInternalState(ctx)
test.That(t, gisF, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

mapTime, err := svc.GetLatestMapInfo(ctx)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)
test.That(t, mapTime, test.ShouldResemble, time.Time{})

cmd := map[string]interface{}{}
resp, err := svc.DoCommand(ctx, cmd)
test.That(t, resp, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)
})

t.Run("is idempotent and makes all endpoints return closed errors when feature flag enabled", func(t *testing.T) {
termFunc := initTestCL(t, logger)
defer termFunc()

dataDirectory, err := os.MkdirTemp("", "*")
test.That(t, err, test.ShouldBeNil)

attrCfg := &vcConfig.Config{
ModularizationV2Enabled: &_true,
Sensors: []string{"replay_sensor"},
ConfigParams: map[string]string{"mode": "2d"},
DataDirectory: dataDirectory,
UseLiveData: &_false,
MapRateSec: &testMapRateSec,
}

svc, err := internaltesthelper.CreateSLAMService(t, attrCfg, logger, false, testExecutableName)
test.That(t, err, test.ShouldBeNil)

// call twice, assert result is the same to prove idempotence
test.That(t, svc.Close(context.Background()), test.ShouldBeNil)
test.That(t, svc.Close(context.Background()), test.ShouldBeNil)

pose, componentRef, err := svc.GetPosition(ctx)
test.That(t, pose, test.ShouldBeNil)
test.That(t, componentRef, test.ShouldBeEmpty)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

gpcmF, err := svc.GetPointCloudMap(ctx)
test.That(t, gpcmF, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

gisF, err := svc.GetInternalState(ctx)
test.That(t, gisF, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)

mapTime, err := svc.GetLatestMapInfo(ctx)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)
test.That(t, mapTime, test.ShouldResemble, time.Time{})

cmd := map[string]interface{}{}
resp, err := svc.DoCommand(ctx, cmd)
test.That(t, resp, test.ShouldBeNil)
test.That(t, err, test.ShouldBeError, viamcartographer.ErrClosed)
})
}

func TestDoCommand(t *testing.T) {
logger := golog.NewTestLogger(t)
dataDir, err := internaltesthelper.CreateTempFolderArchitecture(logger)
Expand Down

0 comments on commit 8faded5

Please sign in to comment.