diff --git a/postprocess/postprocess.go b/postprocess/postprocess.go new file mode 100644 index 00000000..dd14248b --- /dev/null +++ b/postprocess/postprocess.go @@ -0,0 +1,230 @@ +// Package postprocess contains functionality to postprocess pointcloud maps +package postprocess + +import ( + "bytes" + "errors" + "image/color" + + "github.com/golang/geo/r3" + "go.viam.com/rdk/pointcloud" +) + +// Instruction describes the action of the postprocess step. +type Instruction int + +const ( + // Add is the instruction for adding points. + Add Instruction = iota + // Remove is the instruction for removing points. + Remove = iota +) + +const ( + fullConfidence = 100 + removalRadius = 100 // mm + xKey = "X" + yKey = "Y" + + // ToggleCommand can be used to turn postprocessing on and off. + ToggleCommand = "postprocess_toggle" + // AddCommand can be used to add points to the pointcloud map. + AddCommand = "postprocess_add" + // RemoveCommand can be used to remove points from the pointcloud map. + RemoveCommand = "postprocess_remove" + // UndoCommand can be used to undo last postprocessing step. + UndoCommand = "postprocess_undo" +) + +var ( + errPointsNotASlice = errors.New("could not parse provided points as a slice") + errPointNotAMap = errors.New("could not parse provided point as a map") + errXNotProvided = errors.New("could X not provided") + errXNotFloat64 = errors.New("could not parse provided X as a float64") + errYNotProvided = errors.New("could X not provided") + errYNotFloat64 = errors.New("could not parse provided X as a float64") + errRemovingPoints = errors.New("unexpected number of points after removal") + errNilUpdatedData = errors.New("cannot provide nil updated data") +) + +// Task can be used to construct a postprocessing step. +type Task struct { + Instruction Instruction + Points []r3.Vector +} + +// ParseDoCommand parses postprocessing DoCommands into Tasks. +func ParseDoCommand( + unstructuredPoints interface{}, + instruction Instruction, +) (Task, error) { + pointSlice, ok := unstructuredPoints.([]interface{}) + if !ok { + return Task{}, errPointsNotASlice + } + + task := Task{Instruction: instruction} + for _, point := range pointSlice { + pointMap, ok := point.(map[string]interface{}) + if !ok { + return Task{}, errPointNotAMap + } + + x, ok := pointMap[xKey] + if !ok { + return Task{}, errXNotProvided + } + + xFloat, ok := x.(float64) + if !ok { + return Task{}, errXNotFloat64 + } + + y, ok := pointMap[yKey] + if !ok { + return Task{}, errYNotProvided + } + + yFloat, ok := y.(float64) + if !ok { + return Task{}, errXNotFloat64 + } + + task.Points = append(task.Points, r3.Vector{X: xFloat, Y: yFloat}) + } + return task, nil +} + +/* +UpdatePointCloud iterated through a list of tasks and adds or removes points from data +and writes the updated pointcloud to updatedData. +*/ +func UpdatePointCloud( + data []byte, + updatedData *[]byte, + tasks []Task, +) error { + if updatedData == nil { + return errNilUpdatedData + } + + *updatedData = append(*updatedData, data...) + + // iterate through tasks and add or remove points + for _, task := range tasks { + switch task.Instruction { + case Add: + err := updatePointCloudWithAddedPoints(updatedData, task.Points) + if err != nil { + return err + } + case Remove: + err := updatePointCloudWithRemovedPoints(updatedData, task.Points) + if err != nil { + return err + } + } + } + return nil +} + +func updatePointCloudWithAddedPoints(updatedData *[]byte, points []r3.Vector) error { + if updatedData == nil { + return errNilUpdatedData + } + + reader := bytes.NewReader(*updatedData) + pc, err := pointcloud.ReadPCD(reader) + if err != nil { + return err + } + + for _, point := range points { + /* + Viam expects pointcloud data with fields "x y z" or "x y z rgb", and for + this to be specified in the pointcloud header in the FIELDS entry. If color + data is included in the pointcloud, Viam's services assume that the color + value encodes a confidence score for that data point. Viam expects the + confidence score to be encoded in the blue parameter of the RGB value, on a + scale from 1-100. + */ + err := pc.Set(point, pointcloud.NewColoredData(color.NRGBA{B: fullConfidence})) + if err != nil { + return err + } + } + + var buf bytes.Buffer + err = pointcloud.ToPCD(pc, &buf, pointcloud.PCDBinary) + if err != nil { + return err + } + + // Initialize updatedData with new points + *updatedData = make([]byte, buf.Len()) + updatedReader := bytes.NewReader(buf.Bytes()) + _, err = updatedReader.Read(*updatedData) + if err != nil { + return err + } + + return nil +} + +func updatePointCloudWithRemovedPoints(updatedData *[]byte, points []r3.Vector) error { + if updatedData == nil { + return errNilUpdatedData + } + + reader := bytes.NewReader(*updatedData) + pc, err := pointcloud.ReadPCD(reader) + if err != nil { + return err + } + + updatedPC := pointcloud.NewWithPrealloc(pc.Size() - len(points)) + pointsVisited := 0 + + filterRemovedPoints := func(p r3.Vector, d pointcloud.Data) bool { + pointsVisited++ + // Always return true so iteration continues + + for _, point := range points { + // remove all points within the removalRadius from the removed points + if point.Distance(p) <= removalRadius { + return true + } + } + + err := updatedPC.Set(p, d) + // end early if point cannot be set + return err == nil + } + + pc.Iterate(0, 0, filterRemovedPoints) + + // confirm iterate did not have to end early + if pc.Size() != pointsVisited { + /* + Note: this condition will occur if some error occurred while copying valid points + and will be how we can tell that this error occurred: err := updatedPC.Set(p, d) + */ + return errRemovingPoints + } + + buf := bytes.Buffer{} + err = pointcloud.ToPCD(updatedPC, &buf, pointcloud.PCDBinary) + if err != nil { + return err + } + + // Overwrite updatedData with new points + *updatedData = make([]byte, buf.Len()) + updatedReader := bytes.NewReader(buf.Bytes()) + _, err = updatedReader.Read(*updatedData) + if err != nil { + return err + } + + return nil +} diff --git a/postprocess/postprocess_test.go b/postprocess/postprocess_test.go new file mode 100644 index 00000000..d42ac4a2 --- /dev/null +++ b/postprocess/postprocess_test.go @@ -0,0 +1,174 @@ +package postprocess + +import ( + "bytes" + "errors" + "fmt" + "image/color" + "testing" + + "github.com/golang/geo/r3" + "go.viam.com/rdk/pointcloud" + "go.viam.com/test" +) + +type TestCase struct { + msg string + cmd interface{} + err error +} + +func TestParseDoCommand(t *testing.T) { + for _, tc := range []TestCase{ + { + msg: "errors if unstructuredPoints is not a slice", + cmd: "hello", + err: errPointsNotASlice, + }, + { + msg: "errors if unstructuredPoints is not a slice of maps", + cmd: []interface{}{1}, + err: errPointNotAMap, + }, + { + msg: "errors if unstructuredPoints contains a point where X is not float64", + cmd: []interface{}{map[string]interface{}{"Y": float64(2)}}, + err: errXNotProvided, + }, + { + msg: "errors if unstructuredPoints contains a point where X is not float64", + cmd: []interface{}{map[string]interface{}{"X": 1, "Y": float64(2)}}, + err: errXNotFloat64, + }, + { + msg: "errors if unstructuredPoints contains a point where Y is not provided", + cmd: []interface{}{map[string]interface{}{"X": float64(1)}}, + err: errYNotProvided, + }, + { + msg: "errors if unstructuredPoints contains a point where Y is not float64", + cmd: []interface{}{map[string]interface{}{"X": float64(1), "Y": 2}}, + err: errYNotFloat64, + }, + } { + t.Run(fmt.Sprintf("%s for Add task", tc.msg), func(t *testing.T) { + task, err := ParseDoCommand(tc.cmd, Add) + test.That(t, err, test.ShouldBeError, tc.err) + test.That(t, task, test.ShouldResemble, Task{}) + }) + + t.Run(fmt.Sprintf("%s for Remove task", tc.msg), func(t *testing.T) { + task, err := ParseDoCommand(tc.cmd, Remove) + test.That(t, err, test.ShouldBeError, tc.err) + test.That(t, task, test.ShouldResemble, Task{}) + }) + } + + t.Run("succeeds if unstructuredPoints is a slice of maps with float64 values", func(t *testing.T) { + expectedPoint := r3.Vector{X: 1, Y: 2} + task, err := ParseDoCommand([]interface{}{map[string]interface{}{"X": float64(1), "Y": float64(2)}}, Add) + test.That(t, err, test.ShouldBeNil) + test.That(t, task, test.ShouldResemble, Task{Instruction: Add, Points: []r3.Vector{expectedPoint}}) + }) +} + +func TestUpdatePointCloudWithAddedPoints(t *testing.T) { + t.Run("errors if byte slice cannot be converted to PCD", func(t *testing.T) { + originalPointsBytes := []byte("hello") + err := updatePointCloudWithAddedPoints(&originalPointsBytes, []r3.Vector{{X: 2, Y: 2}, {X: 3, Y: 3}}) + test.That(t, err, test.ShouldBeError, errors.New("error reading header line 0: EOF")) + }) + + t.Run("successfully returns point cloud with postprocessed points", func(t *testing.T) { + originalPoints := []r3.Vector{{X: 0, Y: 0}, {X: 1, Y: 1}} + var originalPointsBytes []byte + err := vecSliceToBytes(originalPoints, &originalPointsBytes) + test.That(t, err, test.ShouldBeNil) + + postprocessedPoints := []r3.Vector{{X: 0, Y: 0}, {X: 1, Y: 1}, {X: 2, Y: 2}, {X: 3, Y: 3}} + var postprocessedPointsBytes []byte + err = vecSliceToBytes(postprocessedPoints, &postprocessedPointsBytes) + test.That(t, err, test.ShouldBeNil) + + // update original byte slice with new points + err = updatePointCloudWithAddedPoints(&originalPointsBytes, []r3.Vector{{X: 2, Y: 2}, {X: 3, Y: 3}}) + test.That(t, err, test.ShouldBeNil) + test.That(t, postprocessedPointsBytes, test.ShouldResemble, originalPointsBytes) + }) +} + +func TestUpdatePointCloudWithRemovedPoints(t *testing.T) { + t.Run("errors if byte slice cannot be converted to PCD", func(t *testing.T) { + originalPointsBytes := []byte("hello") + err := updatePointCloudWithRemovedPoints(&originalPointsBytes, []r3.Vector{{X: 2, Y: 2}, {X: 3, Y: 3}}) + test.That(t, err, test.ShouldBeError, errors.New("error reading header line 0: EOF")) + }) + + t.Run("successfully returns point cloud with postprocessed points", func(t *testing.T) { + originalPoints := []r3.Vector{{X: 0, Y: 0}, {X: 1000, Y: 1000}, {X: 2000, Y: 2000}, {X: 2020, Y: 2020}, {X: 3000, Y: 3000}} + var originalPointsBytes []byte + err := vecSliceToBytes(originalPoints, &originalPointsBytes) + test.That(t, err, test.ShouldBeNil) + + postprocessedPoints := []r3.Vector{{X: 0, Y: 0}, {X: 1000, Y: 1000}} + var postprocessedPointsBytes []byte + err = vecSliceToBytes(postprocessedPoints, &postprocessedPointsBytes) + test.That(t, err, test.ShouldBeNil) + + // update original byte slice with new points + err = updatePointCloudWithRemovedPoints(&originalPointsBytes, []r3.Vector{{X: 2000, Y: 2000}, {X: 3000, Y: 3000}}) + test.That(t, err, test.ShouldBeNil) + test.That(t, postprocessedPointsBytes, test.ShouldResemble, originalPointsBytes) + }) +} + +func TestUpdatePointCloud(t *testing.T) { + originalPoints := []r3.Vector{{X: 0, Y: 0}, {X: 1000, Y: 1000}, {X: 2000, Y: 2000}, {X: 3000, Y: 3000}} + var originalPointsBytes []byte + err := vecSliceToBytes(originalPoints, &originalPointsBytes) + test.That(t, err, test.ShouldBeNil) + + postprocessedPoints := []r3.Vector{ + {X: 0, Y: 0}, + {X: 1000, Y: 1000}, + {X: 3000, Y: 3000}, + {X: 5000, Y: 5000}, + } + var postprocessedPointsBytes []byte + err = vecSliceToBytes(postprocessedPoints, &postprocessedPointsBytes) + test.That(t, err, test.ShouldBeNil) + + tasks := []Task{ + { + Instruction: Add, + Points: []r3.Vector{{X: 4000, Y: 4000}, {X: 5000, Y: 5000}}, + }, + { + Instruction: Remove, + Points: []r3.Vector{{X: 2000, Y: 2000}, {X: 4000, Y: 4000}}, + }, + } + var updatedData []byte + err = UpdatePointCloud(originalPointsBytes, &updatedData, tasks) + test.That(t, err, test.ShouldBeNil) + test.That(t, updatedData, test.ShouldResemble, postprocessedPointsBytes) +} + +func vecSliceToBytes(points []r3.Vector, outputData *[]byte) error { + pc := pointcloud.NewWithPrealloc(len(points)) + for _, p := range points { + pc.Set(p, pointcloud.NewColoredData(color.NRGBA{B: fullConfidence})) + } + + buf := bytes.Buffer{} + err := pointcloud.ToPCD(pc, &buf, pointcloud.PCDBinary) + if err != nil { + return err + } + + // Initialize updatedData with new points + *outputData = make([]byte, buf.Len()) + updatedReader := bytes.NewReader(buf.Bytes()) + updatedReader.Read(*outputData) + return nil +} diff --git a/viam_cartographer.go b/viam_cartographer.go index d1dea80f..a2695888 100644 --- a/viam_cartographer.go +++ b/viam_cartographer.go @@ -22,6 +22,7 @@ import ( "github.com/viamrobotics/viam-cartographer/cartofacade" vcConfig "github.com/viamrobotics/viam-cartographer/config" + "github.com/viamrobotics/viam-cartographer/postprocess" "github.com/viamrobotics/viam-cartographer/sensorprocess" s "github.com/viamrobotics/viam-cartographer/sensors" ) @@ -34,6 +35,10 @@ var ( ErrClosed = errors.Errorf("resource (%s) is closed", Model.String()) // ErrUseCloudSlamEnabled denotes that the slam service method was called while use_cloud_slam was set to true. ErrUseCloudSlamEnabled = errors.Errorf("resource (%s) unavailable, configured with use_cloud_slam set to true", Model.String()) + // ErrNoPostprocessingToUndo denotes that the points have not been properly formatted. + ErrNoPostprocessingToUndo = errors.New("there are no postprocessing tasks to undo") + // ErrBadPostprocessingPointsFormat denotest that the postprocesing points have not been correctly provided. + ErrBadPostprocessingPointsFormat = errors.New("invalid postprocessing points format") ) const ( @@ -43,6 +48,13 @@ const ( defaultCartoFacadeTimeout = 5 * time.Minute defaultCartoFacadeInternalTimeout = 15 * time.Minute chunkSizeBytes = 1 * 1024 * 1024 + + // JobDoneCommand is the string that needs to be sent to DoCommand to find out if the job has finished. + JobDoneCommand = "job_done" + // SuccessMessage is sent back after a successful DoCommand request. + SuccessMessage = "success" + // PostprocessToggleResponseKey is the key sent back for the toggle postprocess command. + PostprocessToggleResponseKey = "postprocessed" ) var defaultCartoAlgoCfg = cartofacade.CartoAlgoConfig{ @@ -483,8 +495,10 @@ type CartographerService struct { sensorProcessWorkers sync.WaitGroup cartoFacadeWorkers sync.WaitGroup - mapTimestamp time.Time - jobDone atomic.Bool + mapTimestamp time.Time + jobDone atomic.Bool + postprocessed atomic.Bool + postprocessingTasks []postprocess.Task useCloudSlam bool enableMapping bool @@ -541,6 +555,17 @@ func (cartoSvc *CartographerService) PointCloudMap(ctx context.Context) (func() if err != nil { return nil, err } + + if cartoSvc.postprocessed.Load() { + var updatedPc []byte + err = postprocess.UpdatePointCloud(pc, &updatedPc, cartoSvc.postprocessingTasks) + if err != nil { + return nil, err + } + + return toChunkedFunc(updatedPc), nil + } + return toChunkedFunc(pc), nil } @@ -615,8 +640,44 @@ func (cartoSvc *CartographerService) DoCommand(ctx context.Context, req map[stri return nil, ErrClosed } - if _, ok := req["job_done"]; ok { - return map[string]interface{}{"job_done": cartoSvc.jobDone.Load()}, nil + if _, ok := req[JobDoneCommand]; ok { + return map[string]interface{}{JobDoneCommand: cartoSvc.jobDone.Load()}, nil + } + + if _, ok := req[postprocess.ToggleCommand]; ok { + cartoSvc.postprocessed.Store(!cartoSvc.postprocessed.Load()) + return map[string]interface{}{PostprocessToggleResponseKey: cartoSvc.postprocessed.Load()}, nil + } + + if points, ok := req[postprocess.AddCommand]; ok { + task, err := postprocess.ParseDoCommand(points, postprocess.Add) + if err != nil { + return nil, errors.Wrap(ErrBadPostprocessingPointsFormat, err.Error()) + } + + cartoSvc.postprocessingTasks = append(cartoSvc.postprocessingTasks, task) + cartoSvc.postprocessed.Store(true) + return map[string]interface{}{postprocess.AddCommand: SuccessMessage}, nil + } + + if points, ok := req[postprocess.RemoveCommand]; ok { + task, err := postprocess.ParseDoCommand(points, postprocess.Remove) + if err != nil { + return nil, errors.Wrap(ErrBadPostprocessingPointsFormat, err.Error()) + } + + cartoSvc.postprocessingTasks = append(cartoSvc.postprocessingTasks, task) + cartoSvc.postprocessed.Store(true) + return map[string]interface{}{postprocess.RemoveCommand: SuccessMessage}, nil + } + + if _, ok := req[postprocess.UndoCommand]; ok { + if len(cartoSvc.postprocessingTasks) == 0 { + return nil, ErrNoPostprocessingToUndo + } + + cartoSvc.postprocessingTasks = cartoSvc.postprocessingTasks[:len(cartoSvc.postprocessingTasks)-1] + return map[string]interface{}{postprocess.UndoCommand: SuccessMessage}, nil } return nil, viamgrpc.UnimplementedError diff --git a/viam_cartographer_test.go b/viam_cartographer_test.go index 54572258..a6736f16 100644 --- a/viam_cartographer_test.go +++ b/viam_cartographer_test.go @@ -20,6 +20,7 @@ import ( viamcartographer "github.com/viamrobotics/viam-cartographer" "github.com/viamrobotics/viam-cartographer/cartofacade" vcConfig "github.com/viamrobotics/viam-cartographer/config" + "github.com/viamrobotics/viam-cartographer/postprocess" s "github.com/viamrobotics/viam-cartographer/sensors" "github.com/viamrobotics/viam-cartographer/testhelper" ) @@ -356,5 +357,110 @@ func TestDoCommand(t *testing.T) { test.That(t, err, test.ShouldEqual, viamgrpc.UnimplementedError) test.That(t, resp, test.ShouldBeNil) }) + t.Run("returns false when given 'job_done'", func(t *testing.T) { + cmd := map[string]interface{}{viamcartographer.JobDoneCommand: ""} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, test.ShouldResemble, + map[string]interface{}{viamcartographer.JobDoneCommand: false}, + ) + }) + t.Run("changes postprocess bool after 'postprocess_toggle'", func(t *testing.T) { + cmd := map[string]interface{}{postprocess.ToggleCommand: ""} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, test.ShouldResemble, + map[string]interface{}{viamcartographer.PostprocessToggleResponseKey: true}, + ) + + cmd = map[string]interface{}{postprocess.ToggleCommand: ""} + resp, err = svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, test.ShouldResemble, + map[string]interface{}{viamcartographer.PostprocessToggleResponseKey: false}, + ) + }) + t.Run( + "errors if 'postprocess_undo' is called before any postprocessing has occurred", + func(t *testing.T) { + cmd := map[string]interface{}{postprocess.UndoCommand: ""} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeError, viamcartographer.ErrNoPostprocessingToUndo) + test.That(t, resp, test.ShouldBeNil) + }) + t.Run( + "succeeds if 'postprocess_undo' is called after any postprocessing has occurred", + func(t *testing.T) { + point := map[string]interface{}{"X": float64(1), "Y": float64(1)} + cmd := map[string]interface{}{postprocess.AddCommand: []interface{}{point}} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, + test.ShouldResemble, + map[string]interface{}{postprocess.AddCommand: viamcartographer.SuccessMessage}, + ) + + cmd = map[string]interface{}{postprocess.UndoCommand: ""} + resp, err = svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, + test.ShouldResemble, + map[string]interface{}{postprocess.UndoCommand: viamcartographer.SuccessMessage}, + ) + }) + t.Run( + "success if 'postprocess_add' is called correctly", + func(t *testing.T) { + point := map[string]interface{}{"X": float64(1), "Y": float64(1)} + cmd := map[string]interface{}{postprocess.AddCommand: []interface{}{point}} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, + test.ShouldResemble, + map[string]interface{}{postprocess.AddCommand: viamcartographer.SuccessMessage}, + ) + }) + t.Run( + "errors if 'postprocess_add' is called with incorrect format", + func(t *testing.T) { + cmd := map[string]interface{}{postprocess.AddCommand: "hello"} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err.Error(), test.ShouldContainSubstring, viamcartographer.ErrBadPostprocessingPointsFormat.Error()) + test.That(t, resp, test.ShouldBeNil) + }) + t.Run( + "success if 'postprocess_remove' is called correctly", + func(t *testing.T) { + point := map[string]interface{}{"X": float64(1), "Y": float64(1)} + cmd := map[string]interface{}{postprocess.RemoveCommand: []interface{}{point}} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err, test.ShouldBeNil) + test.That( + t, + resp, + test.ShouldResemble, + map[string]interface{}{postprocess.RemoveCommand: viamcartographer.SuccessMessage}, + ) + }) + t.Run( + "errors if 'postprocess_remove' is called with incorrect format", + func(t *testing.T) { + cmd := map[string]interface{}{postprocess.RemoveCommand: "hello"} + resp, err := svc.DoCommand(context.Background(), cmd) + test.That(t, err.Error(), test.ShouldContainSubstring, viamcartographer.ErrBadPostprocessingPointsFormat.Error()) + test.That(t, resp, test.ShouldBeNil) + }) test.That(t, svc.Close(context.Background()), test.ShouldBeNil) }