diff --git a/ai_models/posenet_resnet50_stride16.tflite b/ai_models/posenet_resnet50_stride16.tflite new file mode 100644 index 00000000..dbb322ab Binary files /dev/null and b/ai_models/posenet_resnet50_stride16.tflite differ diff --git a/build/requirements.txt b/build/requirements.txt index 53a93da0..f2d74308 100644 --- a/build/requirements.txt +++ b/build/requirements.txt @@ -31,6 +31,7 @@ requests-oauthlib>=1.0.0 responses>=0.9.0 simplegeneric>=0.8.1 simplejson>=3.16.0 +scipy==1.4.1 webencodings>=0.5.1 Werkzeug>=0.15.3 concurrent-log-handler>=0.9.19 diff --git a/src/ambianic/pipeline/ai/fall_detect_resnet50.py b/src/ambianic/pipeline/ai/fall_detect_resnet50.py new file mode 100755 index 00000000..ca2a2b55 --- /dev/null +++ b/src/ambianic/pipeline/ai/fall_detect_resnet50.py @@ -0,0 +1,501 @@ +"""Fall detection pipe element.""" +from ambianic.pipeline.ai.tf_detect import TFDetectionModel +from ambianic.pipeline.ai.pose_engine_resnet50 import PoseEngine +from ambianic import DEFAULT_DATA_DIR +import logging +import math +import time +from PIL import Image, ImageDraw +from pathlib import Path + +log = logging.getLogger(__name__) + + +class FallDetector(TFDetectionModel): + + """Detects falls comparing two images spaced about 1-2 seconds apart.""" + def __init__(self, + model=None, + confidence_threshold=0.15, + **kwargs + ): + """Initialize detector with config parameters. + :Parameters: + ---------- + model: dict + { + 'tflite': + 'ai_models/posenet_mobilenet_v1_100_257x257_multi_kpt_stripped.tflite' + 'edgetpu': + 'ai_models/posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite' + } + """ + super().__init__(model=model, + confidence_threshold=confidence_threshold, + **kwargs) + + if self.context: + self._sys_data_dir = self.context.data_dir + else: + self._sys_data_dir = DEFAULT_DATA_DIR + self._sys_data_dir = Path(self._sys_data_dir) + + # previous pose detection information for frame at time t-1 and t-2 \ + # to compare pose changes against + self._prev_data = [None] * 2 + + # Data of previous frames lookup constants + self.POSE_VAL = '_prev_pose_dix' + self.TIMESTAMP = '_prev_time' + self.THUMBNAIL = '_prev_thumbnail' + self.LEFT_ANGLE_WITH_YAXIS = '_prev_left_angle_with_yaxis' + self.RIGHT_ANGLE_WITH_YAXIS = '_prev_right_angle_with_yaxis' + self.BODY_VECTOR_SCORE = '_prev_body_vector_score' + + _dix = {self.POSE_VAL: [], + self.TIMESTAMP: time.monotonic(), + self.THUMBNAIL: None, + self.LEFT_ANGLE_WITH_YAXIS: None, + self.RIGHT_ANGLE_WITH_YAXIS: None, + self.BODY_VECTOR_SCORE: 0 + } + + # self._prev_data[0] : store data of frame at t-2 + # self._prev_data[1] : store data of frame at t-1 + self._prev_data[0] = self._prev_data[1] = _dix + + self._pose_engine = PoseEngine(self._tfengine, context=self.context) + self._fall_factor = 60 + self.confidence_threshold = confidence_threshold + log.debug(f"Initializing FallDetector with conficence threshold: \ + {self.confidence_threshold}") + + # Require a minimum amount of time between two video frames in seconds. + # Otherwise on high performing hard, the poses could be too close to + # each other and have negligible difference + # for fall detection purpose. + self.min_time_between_frames = 1 + # Require the time distance between two video frames not to exceed + # a certain limit in seconds. + # Otherwise there could be data noise which could lead + # false positive detections. + self.max_time_between_frames = 10 + + # keypoint lookup constants + self.LEFT_SHOULDER = 'left shoulder' + self.LEFT_HIP = 'left hip' + self.RIGHT_SHOULDER = 'right shoulder' + self.RIGHT_HIP = 'right hip' + + self.fall_detect_corr = [self.LEFT_SHOULDER, self.LEFT_HIP, + self.RIGHT_SHOULDER, self.RIGHT_HIP] + + def process_sample(self, **sample): + """Detect objects in sample image.""" + log.debug("%s received new sample", self.__class__.__name__) + if not sample: + # pass through empty samples to next element + yield None + else: + try: + image = sample['image'] + inference_result, thumbnail = self.fall_detect(image=image) + inference_result = self.convert_inference_result( + inference_result) + inf_meta = { + 'display': 'Fall Detection', + } + # pass on the results to the next connected pipe element + processed_sample = { + 'image': image, + 'thumbnail': thumbnail, + 'inference_result': inference_result, + 'inference_meta': inf_meta + } + yield processed_sample + except Exception as e: + log.exception('Error "%s" while processing sample. ' + 'Dropping sample: %s', + str(e), + str(sample)) + + def calculate_angle(self, p): + ''' + Calculate angle b/w two lines such as + left shoulder-hip with prev frame's left shoulder-hip or + right shoulder-hip with prev frame's right shoulder-hip or + shoulder-hip line with vertical axis + ''' + + x1, y1 = p[0][0] + x2, y2 = p[0][1] + + x3, y3 = p[1][0] + x4, y4 = p[1][1] + + theta1 = math.degrees(math.atan2(y2-y1, x1-x2)) + theta2 = math.degrees(math.atan2(y4-y3, x3-x4)) + + angle = abs(theta1-theta2) + return angle + + def is_body_line_motion_downward(self, left_angle_with_yaxis, + rigth_angle_with_yaxis, inx): + + test = False + + l_angle = left_angle_with_yaxis and \ + self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS] \ + and left_angle_with_yaxis > \ + self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS] + r_angle = rigth_angle_with_yaxis and \ + self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS] \ + and rigth_angle_with_yaxis > \ + self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS] + + if l_angle or r_angle: + test = True + + return test + + def find_keypoints(self, image): + + # this score value should be related to the configuration \ + # confidence_threshold parameter + min_score = self.confidence_threshold + rotations = [Image.ROTATE_270, Image.ROTATE_90] + angle = 0 + pose = None + poses, thumbnail, _ = self._pose_engine.detect_poses(image) + width, height = thumbnail.size + # if no pose detected with high confidence, + # try rotating the image +/- 90' to find a fallen person + # currently only looking at pose[0] because we are focused \ + # on a lone person falls + # while (not poses or poses[0].score < min_score) and rotations: + spinal_vector_score, pose_dix = self.estimate_spinal_vector_score( + poses[0]) + while spinal_vector_score < min_score and rotations: + angle = rotations.pop() + transposed = image.transpose(angle) + # we are interested in the poses but not the rotated thumbnail + poses, _, _ = self._pose_engine.detect_poses(transposed) + spinal_vector_score, pose_dix = self.estimate_spinal_vector_score( + poses[0]) + + if poses and poses[0]: + pose = poses[0] + + # lets check if we found a good pose candidate + + if (pose and spinal_vector_score >= min_score): + # if the image was rotated, we need to rotate back to the original\ + # image coordinates + # before comparing with poses in other frames. + if angle == Image.ROTATE_90: + # ROTATE_90 rotates 90' counter clockwise \ + # from ^ to < orientation. + for _, keypoint in pose.keypoints.items(): + # keypoint.yx[0] is the x coordinate in an image + # keypoint.yx[0] is the y coordinate in an image, \ + # with 0,0 in the upper left corner (not lower left). + tmp_swap = keypoint.yx[0] + keypoint.yx[0] = width-keypoint.yx[1] + keypoint.yx[1] = tmp_swap + elif angle == Image.ROTATE_270: + # ROTATE_270 rotates 90' clockwise from ^ to > orientation. + for _, keypoint in pose.keypoints.items(): + tmp_swap = keypoint.yx[0] + keypoint.yx[0] = keypoint.yx[1] + keypoint.yx[1] = height-tmp_swap + # we could not detexct a pose with sufficient confidence + log.info(f"""A pose detected with + spinal_vector_score={spinal_vector_score} >= {min_score} + confidence threshold. + Pose keypoints: {pose_dix}" + """) + else: + pose = None + + return pose, thumbnail, spinal_vector_score, pose_dix + + def find_changes_in_angle(self, pose_dix, inx): + ''' + Find the changes in angle for shoulder-hip lines + b/w current and previpus frame. + ''' + + prev_leftLine_corr_exist = all(e in self._prev_data[inx][self.POSE_VAL] + for e in [self.LEFT_SHOULDER, self.LEFT_HIP]) + curr_leftLine_corr_exist = all(e in pose_dix for e in [self.LEFT_SHOULDER,self.LEFT_HIP]) + + prev_rightLine_corr_exist = all(e in self._prev_data[inx][self.POSE_VAL] for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]) + curr_rightLine_corr_exist = all(e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]) + + left_angle = right_angle = 0 + + if prev_leftLine_corr_exist and curr_leftLine_corr_exist: + temp_left_vector = [[self._prev_data[inx][self.POSE_VAL][self.LEFT_SHOULDER], + self._prev_data[inx][self.POSE_VAL][self.LEFT_HIP]], + [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]] + left_angle = self.calculate_angle(temp_left_vector) + log.debug("Left shoulder-hip angle: %r", left_angle) + + if prev_rightLine_corr_exist and curr_rightLine_corr_exist: + temp_right_vector = [[self._prev_data[inx][self.POSE_VAL][self.RIGHT_SHOULDER], + self._prev_data[inx][self.POSE_VAL][self.RIGHT_HIP]], + [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]] + right_angle = self.calculate_angle(temp_right_vector) + log.debug("Right shoulder-hip angle: %r", right_angle) + + angle_change = max(left_angle, right_angle) + return math.ceil(angle_change) + + def assign_prev_records(self, pose_dix, left_angle_with_yaxis, + rigth_angle_with_yaxis, now, thumbnail, + current_body_vector_score): + + curr_data = {self.POSE_VAL: pose_dix, + self.TIMESTAMP: now, + self.THUMBNAIL: thumbnail, + self.LEFT_ANGLE_WITH_YAXIS: left_angle_with_yaxis, + self.RIGHT_ANGLE_WITH_YAXIS: rigth_angle_with_yaxis, + self.BODY_VECTOR_SCORE: current_body_vector_score} + + self._prev_data[-2] = self._prev_data[-1] + self._prev_data[-1] = curr_data + + def draw_lines(self, thumbnail, pose_dix, score): + """Draw body lines if available. Return number of lines drawn.""" + # save an image with drawn lines for debugging + draw = ImageDraw.Draw(thumbnail) + path = None + body_lines_drawn = 0 + + if pose_dix is None: + return body_lines_drawn + + if pose_dix.keys() >= {self.LEFT_SHOULDER, self.LEFT_HIP}: + body_line = [tuple(pose_dix[self.LEFT_SHOULDER]), + tuple(pose_dix[self.LEFT_HIP])] + draw.line(body_line, fill='red') + body_lines_drawn += 1 + + if pose_dix.keys() >= {self.RIGHT_SHOULDER, self.RIGHT_HIP}: + body_line = [tuple(pose_dix[self.RIGHT_SHOULDER]), + tuple(pose_dix[self.RIGHT_HIP])] + draw.line(body_line, fill='red') + body_lines_drawn += 1 + + # save a thumbnail for debugging + timestr = int(time.monotonic()*1000) + debug_image_file_name = \ + f'tmp-fall-detect-thumbnail-{timestr}-score-{score}.jpg' + thumbnail.save( + Path(self._sys_data_dir, debug_image_file_name), + format='JPEG') + return body_lines_drawn + + def get_line_angles_with_yaxis(self, pose_dix): + ''' + Find the angle b/w shoulder-hip line with yaxis. + ''' + y_axis_corr = [[0, 0], [0, self._pose_engine._tensor_image_height]] + + leftLine_corr_exist = all(e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP]) + rightLine_corr_exist = all(e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP]) + + l_angle = r_angle = 0 + + if leftLine_corr_exist: + l_angle = self.calculate_angle([y_axis_corr, + [pose_dix[self.LEFT_SHOULDER], + pose_dix[self.LEFT_HIP]]]) + + if rightLine_corr_exist: + r_angle = self.calculate_angle([y_axis_corr, + [pose_dix[self.RIGHT_SHOULDER], + pose_dix[self.RIGHT_HIP]]]) + + return (l_angle, r_angle) + + def estimate_spinal_vector_score(self, pose): + pose_dix = {} + is_leftVector = is_rightVector = False + + # Calculate leftVectorScore & rightVectorScore + leftVectorScore = min(pose.keypoints[self.LEFT_SHOULDER].score, + pose.keypoints[self.LEFT_HIP].score) + rightVectorScore = min(pose.keypoints[self.RIGHT_SHOULDER].score, + pose.keypoints[self.RIGHT_HIP].score) + + if leftVectorScore > self.confidence_threshold: + is_leftVector = True + pose_dix[self.LEFT_SHOULDER] = \ + pose.keypoints[self.LEFT_SHOULDER].yx + pose_dix[self.LEFT_HIP] = pose.keypoints[self.LEFT_HIP].yx + + if rightVectorScore > self.confidence_threshold: + is_rightVector = True + pose_dix[self.RIGHT_SHOULDER] = \ + pose.keypoints[self.RIGHT_SHOULDER].yx + pose_dix[self.RIGHT_HIP] = pose.keypoints[self.RIGHT_HIP].yx + + def find_spinalLine(): + left_spinal_x1 = (pose_dix[self.LEFT_SHOULDER][0] + + pose_dix[self.RIGHT_SHOULDER][0]) / 2 + left_spinal_y1 = (pose_dix[self.LEFT_SHOULDER][1] + + pose_dix[self.RIGHT_SHOULDER][1]) / 2 + + right_spinal_x1 = (pose_dix[self.LEFT_HIP][0] + + pose_dix[self.RIGHT_HIP][0]) / 2 + right_spinal_y1 = (pose_dix[self.LEFT_HIP][1] + + pose_dix[self.RIGHT_HIP][1]) / 2 + + return (left_spinal_x1, left_spinal_y1), \ + (right_spinal_x1, right_spinal_y1) + + if is_leftVector and is_rightVector: + spinalVectorEstimate = find_spinalLine() + spinalVectorScore = (leftVectorScore + rightVectorScore) / 2.0 + elif is_leftVector: + spinalVectorEstimate = pose_dix[self.LEFT_SHOULDER], \ + pose_dix[self.LEFT_HIP] + # 10% score penalty in conficence as only \ + # left shoulder-hip line is detected + spinalVectorScore = leftVectorScore * 0.9 + elif is_rightVector: + spinalVectorEstimate = pose_dix[self.RIGHT_SHOULDER], \ + pose_dix[self.RIGHT_HIP] + # 10% score penalty in conficence as only \ + # right shoulder-hip line is detected + spinalVectorScore = rightVectorScore * 0.9 + else: + spinalVectorScore = 0 + + log.debug(f"Estimated spinal vector score: {spinalVectorScore}") + return spinalVectorScore, pose_dix + + def fall_detect(self, image=None): + + assert image + log.debug("Calling TF engine for inference") + start_time = time.monotonic() + + now = time.monotonic() + lapse = now - self._prev_data[-1][self.TIMESTAMP] + + if self._prev_data[-1][self.POSE_VAL] \ + and lapse < self.min_time_between_frames: + log.debug("Received an image frame too soon after the previous \ + frame. Only %.2f ms apart.\ + Minimum %.2f ms distance required for fall detection.", + lapse, self.min_time_between_frames) + + inference_result = None + thumbnail = self._prev_data[-1][self.THUMBNAIL] + else: + # Detection using tensorflow posenet module + pose, thumbnail, spinal_vector_score, pose_dix = \ + self.find_keypoints(image) + + inference_result = None + if not pose: + log.debug(f"No pose detected or detection score does not meet \ + confidence threshold of {self.confidence_threshold}.") + else: + inference_result = [] + + current_body_vector_score = spinal_vector_score + + # Find line angle with vertcal axis + left_angle_with_yaxis, rigth_angle_with_yaxis = \ + self.get_line_angles_with_yaxis(pose_dix) + + # save an image with drawn lines for debugging + if log.getEffectiveLevel() <= logging.DEBUG: + # development mode + self.draw_lines(thumbnail, pose_dix, spinal_vector_score) + + for t in [-1, -2]: + lapse = now - self._prev_data[t][self.TIMESTAMP] + + if not self._prev_data[t][self.POSE_VAL] or \ + lapse > self.max_time_between_frames: + log.debug("No recent pose to compare to. Will save \ + this frame pose for subsequent comparison.") + + elif not self.is_body_line_motion_downward( + left_angle_with_yaxis, + rigth_angle_with_yaxis, + inx=t): + log.debug("The body-line angle with vertical axis is \ + decreasing from the previous frame. \ + Not likely to be a fall.") + + else: + leaning_angle = self.find_changes_in_angle(pose_dix, + inx=t) + # Get leaning_probability by comparing leaning_angle + # with fall_factor probability. + leaning_probability = 1 \ + if leaning_angle >= self._fall_factor else 0 + + # Calculate fall score using average of current and \ + # previous frame's body vector score with \ + # leaning_probability + fall_score = leaning_probability * \ + (self._prev_data[t][self.BODY_VECTOR_SCORE] + + current_body_vector_score) / 2 + + if fall_score >= self.confidence_threshold: + inference_result.append(('FALL', fall_score, + leaning_angle, pose_dix)) + log.info("Fall detected: %r", inference_result) + break + else: + log.debug(f"No fall detected due to low \ + confidence score: \ + {fall_score} < {self.confidence_threshold} \ + min threshold.Inference result: {inference_result}") + + log.debug("Saving pose for subsequent comparison.") + self.assign_prev_records(pose_dix, left_angle_with_yaxis, + rigth_angle_with_yaxis, now, + thumbnail, + current_body_vector_score) + + # log.debug("Logging stats") + + self.log_stats(start_time=start_time) + log.debug("thumbnail: %r", thumbnail) + return inference_result, thumbnail + + def convert_inference_result(self, inference_result): + inf_json = [] + + if inference_result: + for inf in inference_result: + label, confidence, leaning_angle, keypoint_corr = inf + log.info('label: %s , confidence: %.0f, leaning_angle: %.0f, \ + keypoint_corr: %s', + label, + confidence, + leaning_angle, + keypoint_corr) + one_inf = { + 'label': label, + 'confidence': confidence, + 'leaning_angle': leaning_angle, + 'keypoint_corr': { + 'left shoulder': keypoint_corr.get('left shoulder', + None), + 'left hip': keypoint_corr.get('left hip', None), + 'right shoulder': keypoint_corr.get('right shoulder', + None), + 'right hip': keypoint_corr.get('right hip', None) + } + } + inf_json.append(one_inf) + + return inf_json diff --git a/src/ambianic/pipeline/ai/pose_engine_resnet50.py b/src/ambianic/pipeline/ai/pose_engine_resnet50.py new file mode 100644 index 00000000..93c688d7 --- /dev/null +++ b/src/ambianic/pipeline/ai/pose_engine_resnet50.py @@ -0,0 +1,198 @@ +from ambianic.pipeline.ai.pose_engine_utils import decode_multiple_poses +from ambianic.pipeline.ai.tf_detect import TFDetectionModel +from ambianic import DEFAULT_DATA_DIR +import logging +import time +import numpy as np +from PIL import Image, ImageDraw +from pathlib import Path + + +log = logging.getLogger(__name__) + +KEYPOINTS = ( + 'nose', + 'left eye', + 'right eye', + 'left ear', + 'right ear', + 'left shoulder', + 'right shoulder', + 'left elbow', + 'right elbow', + 'left wrist', + 'right wrist', + 'left hip', + 'right hip', + 'left knee', + 'right knee', + 'left ankle', + 'right ankle' +) + + +class Keypoint: + __slots__ = ['k', 'yx', 'score'] + + def __init__(self, k, yx, score=None): + self.k = k + self.yx = yx + self.score = score + + def __repr__(self): + return 'Keypoint(<{}>, {}, {})'.format(self.k, self.yx, self.score) + + +class Pose: + __slots__ = ['keypoints', 'score'] + + def __init__(self, keypoints, score=None): + assert len(keypoints) == len(KEYPOINTS) + self.keypoints = keypoints + self.score = score + + def __repr__(self): + return 'Pose({}, {})'.format(self.keypoints, self.score) + + +class PoseEngine: + """Engine used for pose tasks.""" + def __init__(self, tfengine=None, context=None): + """Creates a PoseEngine wrapper around an initialized tfengine. + """ + if context: + self._sys_data_dir = context.data_dir + else: + self._sys_data_dir = DEFAULT_DATA_DIR + self._sys_data_dir = Path(self._sys_data_dir) + assert tfengine is not None + self._tfengine = tfengine + + self._input_tensor_shape = self.get_input_tensor_shape() + + _, self._tensor_image_height, self._tensor_image_width, self._tensor_image_depth = \ + self.get_input_tensor_shape() + + self.confidence_threshold = self._tfengine.confidence_threshold + log.debug(f"Initializing PoseEngine with confidence threshold \ + {self.confidence_threshold}") + + def get_input_tensor_shape(self): + """Get the shape of the input tensor structure. + Gets the shape required for the input tensor. + For models trained for image classification / detection, the shape is + always [1, height, width, channels]. + To be used as input for :func:`run_inference`, + this tensor shape must be flattened into a 1-D array with size + ``height * width * channels``. To instead get that 1-D array size, use + :func:`required_input_array_size`. + Returns: + A 1-D array (:obj:`numpy.ndarray`) representing the required input + tensor shape. + """ + return self._tfengine.input_details[0]['shape'] + + def parse_output(self, heatmap_data, offset_data, threshold): + joint_num = heatmap_data.shape[-1] + pose_kps = np.zeros((joint_num, 4), np.float32) + + for i in range(heatmap_data.shape[-1]): + + joint_heatmap = heatmap_data[..., i] + max_val_pos = np.squeeze( + np.argwhere(joint_heatmap == np.max(joint_heatmap))) + remap_pos = np.array(max_val_pos/8*self._tensor_image_height, + dtype=np.int32) + pose_kps[i, 0] = int(remap_pos[0] + offset_data[max_val_pos[0], + max_val_pos[1], i]) + pose_kps[i, 1] = int(remap_pos[1] + offset_data[max_val_pos[0], + max_val_pos[1], i+joint_num]) + max_prob = np.max(joint_heatmap) + pose_kps[i, 3] = max_prob + if max_prob > threshold: + if pose_kps[i, 0] < self._tensor_image_height and \ + pose_kps[i, 1] < self._tensor_image_width: + pose_kps[i, 2] = 1 + + return pose_kps + + def sigmoid(self, x): + return 1 / (1 + np.exp(-x)) + + def tf_interpreter(self): + return self._tfengine._tf_interpreter + + def detect_poses(self, img): + """ + Detects poses in a given image. + :Parameters: + ---------- + img : PIL.Image + Input Image for AI model detection. + :Returns: + ------- + poses: + A list of Pose objects with keypoints and confidence scores + PIL.Image + Resized image fitting the AI model input tensor. + """ + + image_net_mean = [-123.15, -115.90, -103.06] + + _tensor_input_size = (self._tensor_image_width, + self._tensor_image_height) + + # thumbnail is a proportionately resized image + thumbnail = TFDetectionModel.thumbnail(image=img, + desired_size=_tensor_input_size) + # convert thumbnail into an image with the exact size + # as the input tensor preserving proportions by padding with + # a solid color as needed + template_image = TFDetectionModel.resize(image=thumbnail, + desired_size=_tensor_input_size) + + template_image = np.expand_dims(template_image.copy(), axis=0) + template_input = template_image + image_net_mean + + template_input = template_input.astype(np.float32) + + self.tf_interpreter().\ + set_tensor(self._tfengine.input_details[0]['index'], + template_input) + self.tf_interpreter().invoke() + + heatmap_result = self.tf_interpreter().\ + get_tensor(self._tfengine.output_details[0]['index']) + offsets_result = self.tf_interpreter().\ + get_tensor(self._tfengine.output_details[1]['index']) + displacement_bwd_result = self.tf_interpreter().\ + get_tensor(self._tfengine.output_details[2]['index']) + displacement_fwd_result = self.tf_interpreter().\ + get_tensor(self._tfengine.output_details[3]['index']) + + heatmap_result = self.sigmoid(heatmap_result) + + pose_score, keypoint_scores, keypoint_coords = decode_multiple_poses( + heatmap_result.squeeze(axis=0), + offsets_result.squeeze(axis=0), + displacement_fwd_result.squeeze(axis=0), + displacement_bwd_result.squeeze(axis=0), + output_stride=16) + + pose_score = pose_score[0] + keypoint_scores = keypoint_scores[0] + keypoint_coords = keypoint_coords[0] + + poses = [] + keypoint_dict = {} + + for point_i in range(len(keypoint_coords)): + x, y = keypoint_coords[point_i, 1], keypoint_coords[point_i, 0] + keypoint = Keypoint(KEYPOINTS[point_i], [x, y], + keypoint_scores[point_i]) + keypoint_dict[KEYPOINTS[point_i]] = keypoint + + log.debug(f"Overall pose score (keypoint score average): {pose_score}") + poses.append(Pose(keypoint_dict, pose_score)) + + return poses, thumbnail, pose_score diff --git a/src/ambianic/pipeline/ai/pose_engine_utils.py b/src/ambianic/pipeline/ai/pose_engine_utils.py new file mode 100644 index 00000000..e2dd2ffb --- /dev/null +++ b/src/ambianic/pipeline/ai/pose_engine_utils.py @@ -0,0 +1,181 @@ +import scipy.ndimage as ndi +import numpy as np + + +def within_nms_radius_fast(pose_coords, squared_nms_radius, point): + if not pose_coords.shape[0]: + return False + return np.any(np.sum((pose_coords - point) ** 2, axis=1) <= squared_nms_radius) + + +def build_part_with_score_fast(score_threshold, local_max_radius, scores): + parts = [] + num_keypoints = scores.shape[2] + lmd = 2 * local_max_radius + 1 + + for keypoint_id in range(num_keypoints): + kp_scores = scores[:, :, keypoint_id].copy() + kp_scores[kp_scores < score_threshold] = 0. + max_vals = ndi.maximum_filter(kp_scores, size=lmd, mode='constant') + max_loc = np.logical_and(kp_scores == max_vals, kp_scores > 0) + max_loc_idx = max_loc.nonzero() + for y, x in zip(*max_loc_idx): + parts.append(( + scores[y, x, keypoint_id], + keypoint_id, + np.array((y, x)) + )) + + return parts + + +def decode_multiple_poses(scores, offsets, displacements_fwd, + displacements_bwd, output_stride, + max_pose_detections=1, score_threshold=0.5, + nms_radius=20, min_pose_score=0.4): + + NUM_KEYPOINTS = 17 + LOCAL_MAXIMUM_RADIUS = 1 + + pose_count = 0 + pose_scores = np.zeros(max_pose_detections) + pose_keypoint_scores = np.zeros((max_pose_detections, NUM_KEYPOINTS)) + pose_keypoint_coords = np.zeros((max_pose_detections, NUM_KEYPOINTS, 2)) + + squared_nms_radius = nms_radius ** 2 + + scored_parts = build_part_with_score_fast(score_threshold, + LOCAL_MAXIMUM_RADIUS, scores) + scored_parts = sorted(scored_parts, key=lambda x: x[0], reverse=True) + + height = scores.shape[0] + width = scores.shape[1] + offsets = offsets.reshape(height, width, 2, -1).swapaxes(2, 3) + displacements_fwd = displacements_fwd.reshape(height, width, 2, -1).\ + swapaxes(2, 3) + displacements_bwd = displacements_bwd.reshape(height, width, 2, -1).\ + swapaxes(2, 3) + + for root_score, root_id, root_coord in scored_parts: + root_image_coords = root_coord * output_stride + offsets[ + root_coord[0], root_coord[1], root_id] + + if within_nms_radius_fast( + pose_keypoint_coords[:pose_count, root_id, :], + squared_nms_radius, root_image_coords): + continue + + keypoint_scores, keypoint_coords = decode_pose( + root_score, root_id, root_image_coords, + scores, offsets, output_stride, + displacements_fwd, displacements_bwd) + + pose_score = get_instance_score_fast( + pose_keypoint_coords[:pose_count, :, :], squared_nms_radius, + keypoint_scores, keypoint_coords) + + if min_pose_score == 0. or pose_score >= min_pose_score: + pose_scores[pose_count] = pose_score + pose_keypoint_scores[pose_count, :] = keypoint_scores + pose_keypoint_coords[pose_count, :, :] = keypoint_coords + pose_count += 1 + + if pose_count >= max_pose_detections: + break + + return pose_scores, pose_keypoint_scores, pose_keypoint_coords + + +def decode_pose(root_score, root_id, root_image_coord, scores, offsets, + output_stride, displacements_fwd, displacements_bwd): + + PART_NAMES = [ + "nose", "leftEye", "rightEye", "leftEar", "rightEar", "leftShoulder", + "rightShoulder", "leftElbow", "rightElbow", "leftWrist", "rightWrist", + "leftHip", "rightHip", "leftKnee", "rightKnee", "leftAnkle", + "rightAnkle" + ] + + PART_IDS = {pn: pid for pid, pn in enumerate(PART_NAMES)} + + POSE_CHAIN = [ + ("nose", "leftEye"), ("leftEye", "leftEar"), ("nose", "rightEye"), + ("rightEye", "rightEar"), ("nose", "leftShoulder"), + ("leftShoulder", "leftElbow"), ("leftElbow", "leftWrist"), + ("leftShoulder", "leftHip"), ("leftHip", "leftKnee"), + ("leftKnee", "leftAnkle"), ("nose", "rightShoulder"), + ("rightShoulder", "rightElbow"), ("rightElbow", "rightWrist"), + ("rightShoulder", "rightHip"), ("rightHip", "rightKnee"), + ("rightKnee", "rightAnkle") + ] + + PARENT_CHILD_TUPLES = [(PART_IDS[parent], PART_IDS[child]) for parent, child in POSE_CHAIN] + + num_parts = scores.shape[2] + num_edges = len(PARENT_CHILD_TUPLES) + + instance_keypoint_scores = np.zeros(num_parts) + instance_keypoint_coords = np.zeros((num_parts, 2)) + instance_keypoint_scores[root_id] = root_score + instance_keypoint_coords[root_id] = root_image_coord + + for edge in reversed(range(num_edges)): + target_keypoint_id, source_keypoint_id = PARENT_CHILD_TUPLES[edge] + if (instance_keypoint_scores[source_keypoint_id] > 0.0 and + instance_keypoint_scores[target_keypoint_id] == 0.0): + score, coords = traverse_to_targ_keypoint( + edge, + instance_keypoint_coords[source_keypoint_id], + target_keypoint_id, + scores, offsets, output_stride, displacements_bwd) + instance_keypoint_scores[target_keypoint_id] = score + instance_keypoint_coords[target_keypoint_id] = coords + + for edge in range(num_edges): + source_keypoint_id, target_keypoint_id = PARENT_CHILD_TUPLES[edge] + if (instance_keypoint_scores[source_keypoint_id] > 0.0 and + instance_keypoint_scores[target_keypoint_id] == 0.0): + score, coords = traverse_to_targ_keypoint( + edge, + instance_keypoint_coords[source_keypoint_id], + target_keypoint_id, + scores, offsets, output_stride, displacements_fwd) + instance_keypoint_scores[target_keypoint_id] = score + instance_keypoint_coords[target_keypoint_id] = coords + + return instance_keypoint_scores, instance_keypoint_coords + + +def get_instance_score_fast( + exist_pose_coords, + squared_nms_radius, + keypoint_scores, keypoint_coords): + + if exist_pose_coords.shape[0]: + s = np.sum((exist_pose_coords - keypoint_coords) ** 2, axis=2) > squared_nms_radius + not_overlapped_scores = np.sum(keypoint_scores[np.all(s, axis=0)]) + else: + not_overlapped_scores = np.sum(keypoint_scores) + return not_overlapped_scores / len(keypoint_scores) + + +def traverse_to_targ_keypoint(edge_id, source_keypoint, target_keypoint_id, + scores, offsets, output_stride, displacements): + height = scores.shape[0] + width = scores.shape[1] + + source_keypoint_indices = np.clip( + np.round(source_keypoint / output_stride), a_min=0, a_max=[height - 1, width - 1]).astype(np.int32) + + displaced_point = source_keypoint + displacements[ + source_keypoint_indices[0], source_keypoint_indices[1], edge_id] + + displaced_point_indices = np.clip( + np.round(displaced_point / output_stride), a_min=0, a_max=[height - 1, width - 1]).astype(np.int32) + + score = scores[displaced_point_indices[0], displaced_point_indices[1], target_keypoint_id] + + image_coord = displaced_point_indices * output_stride + offsets[ + displaced_point_indices[0], displaced_point_indices[1], target_keypoint_id] + + return score, image_coord diff --git a/tests/pipeline/ai/posenet_resnet50_stride16.tflite b/tests/pipeline/ai/posenet_resnet50_stride16.tflite new file mode 100644 index 00000000..dbb322ab Binary files /dev/null and b/tests/pipeline/ai/posenet_resnet50_stride16.tflite differ diff --git a/tests/pipeline/ai/test_fall_detect_resnet50.py b/tests/pipeline/ai/test_fall_detect_resnet50.py new file mode 100755 index 00000000..01f35df7 --- /dev/null +++ b/tests/pipeline/ai/test_fall_detect_resnet50.py @@ -0,0 +1,729 @@ +"""Test fall detection pipe element.""" +from ambianic.pipeline.ai.fall_detect_resnet50 import FallDetector +from ambianic.pipeline.ai.object_detect import ObjectDetector +from ambianic.pipeline import PipeElement +import os +import time +from PIL import Image + + +def _fall_detect_config(): + + _dir = os.path.dirname(os.path.abspath(__file__)) + _good_tflite_model = os.path.join( + _dir, + 'posenet_resnet50_stride16.tflite' + ) + _good_edgetpu_model = os.path.join( + _dir, + 'posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite' + ) + _good_labels = os.path.join(_dir, 'pose_labels.txt') + config = { + 'model': { + 'tflite': _good_tflite_model, + 'edgetpu': _good_edgetpu_model, + }, + 'labels': _good_labels, + 'top_k': 3, + 'confidence_threshold': 0.6, + } + return config + + +def _get_image(file_name=None): + assert file_name + _dir = os.path.dirname(os.path.abspath(__file__)) + image_file = os.path.join(_dir, file_name) + img = Image.open(image_file) + return img + + +class _OutPipeElement(PipeElement): + + def __init__(self, sample_callback=None): + super().__init__() + assert sample_callback + self._sample_callback = sample_callback + + def receive_next_sample(self, **sample): + self._sample_callback(**sample) + + +def test_model_inputs(): + """Verify against known model inputs.""" + config = _fall_detect_config() + fall_detector = FallDetector(**config) + tfe = fall_detector._tfengine + + samples = tfe.input_details[0]['shape'][0] + assert samples == 1 + height = tfe.input_details[0]['shape'][1] + assert height == 256 + width = tfe.input_details[0]['shape'][2] + assert width == 256 + colors = tfe.input_details[0]['shape'][3] + assert colors == 3 + + +def test_fall_detection_thumbnail_present(): + """Expected to receive thumnail in result if image is provided \ + and poses are detected.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, thumbnail=None, inference_result=None, + **kwargs): + nonlocal result + result = image is not None and thumbnail is not None and \ + inference_result is not None + + fall_detector = FallDetector(**config) + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + img_1 = _get_image(file_name='fall_img_1.png') + fall_detector.receive_next_sample(image=img_1) + assert result is True + + +def test_fall_detection_case_1(): + """Expected to detect a fall as key-points are detected by rotating second image.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # The frame represents a person completely falls. + img_2 = _get_image(file_name='fall_img_3.png') + + fall_detector.receive_next_sample(image=img_1) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert result + assert len(result) == 1 + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.7 + assert angle > 60 + + +def test_fall_detection_case_2_1(): + """Expected to not detect a fall even though key-points are detected + and the angle criteria is met. However the time distance between + frames is too short.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # The frame represents a person falls. + img_2 = _get_image(file_name='fall_img_2.png') + + start_time = time.monotonic() + fall_detector.receive_next_sample(image=img_1) + end_time = time.monotonic() + safe_min = end_time-start_time+1 + # set min time to a sufficiently big number to ensure test passes + # on slow environments + # the goal is to simulate two frames that are too close in time + # to be considered for a fall detection sequence + fall_detector.min_time_between_frames = safe_min + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_fall_detection_case_2_2(): + """Expected to detect a fall because key-points are detected, + the angle criteria is met and the time distance between + frames is not too short.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # The frame represents a person falls. + img_2 = _get_image(file_name='fall_img_2.png') + + fall_detector.receive_next_sample(image=img_1) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert result + assert len(result) == 1 + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.7 + assert angle >= 60 + + +def test_fall_detection_case_3_1(): + """Expect to detect a fall as key-points are detected by + rotating the image clockwise.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_11.png') + + # The frame represents a person completely falls. + img_2 = _get_image(file_name='fall_img_12.png') + + fall_detector.receive_next_sample(image=img_1) + # set min time to a small number to speed up testing + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert result + assert len(result) == 1 + + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.3 + assert angle > 60 + + +def test_fall_detection_case_3_2(): + """Expect to detect a fall as key-points are detected + by rotating the image counter clockwise.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_11_flip.png') + + # The frame represents a person completely falls. + img_2 = _get_image(file_name='fall_img_12_flip.png') + + fall_detector.receive_next_sample(image=img_1) + # set min time to a small number to speed up testing + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert result + assert len(result) == 1 + + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.3 + assert angle > 60 + + +def test_fall_detection_case_4(): + """No Fall""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # The frame represents a person who is in a standing position. + img_2 = _get_image(file_name='fall_img_4.png') + + fall_detector.receive_next_sample(image=img_1) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_fall_detection_case_5(): + """Expected to not detect a fall even the angle criteria is met + because image 2 is standing up rather than fall""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person falls. + img_1 = _get_image(file_name='fall_img_2.png') + + # The frame represents a person who is in a standing position. + img_2 = _get_image(file_name='fall_img_1.png') + + fall_detector.receive_next_sample(image=img_1) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_fall_detection_case_6(): + """Expect to not detect a fall as in 1st image key-points are detected + but not in 2nd""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_5.png') + + # No person in a frame + img_2 = _get_image(file_name='fall_img_6.png') + + fall_detector.receive_next_sample(image=img_1) + # set min time to a small number to speed up testing + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_fall_detection_case_7(): + """Expect to not detect a fall""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_5.png') + + # The frame represents a person who is in a standing position. + img_2 = _get_image(file_name='fall_img_7.png') + + fall_detector.receive_next_sample(image=img_1) + # set min time to a small number to speed up testing + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_fall_detection_case_8(): + """Expect to not detect a fall""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + + fall_detector.connect_to_next_element(output) + + # No person in a frame + img_1 = _get_image(file_name='fall_img_6.png') + + # The frame represents a person who is in a standing position. + img_2 = _get_image(file_name='fall_img_7.png') + + fall_detector.receive_next_sample(image=img_1) + # set min time to a small number to speed up testing + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert not result + + +def test_background_image(): + """Expect to not detect anything interesting in a background image.""" + config = _fall_detect_config() + result = None + + def sample_callback(image=None, thumbnail=None, inference_result=None, + **kwargs): + nonlocal result + result = image is not None and thumbnail is not None and \ + not inference_result + fall_detector = FallDetector(**config) + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + img = _get_image(file_name='background.jpg') + fall_detector.receive_next_sample(image=img) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + img = _get_image(file_name='background.jpg') + fall_detector.receive_next_sample(image=img) + assert result is True + + +def test_no_sample(): + """Expect element to pass empty sample to next element.""" + config = _fall_detect_config() + result = False + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = image is None and inference_result is None + fall_detector = FallDetector(**config) + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + fall_detector.receive_next_sample() + assert result is True + + +def test_bad_sample_good_sample(): + """One bad sample should not prevent good samples from being processed.""" + config = _fall_detect_config() + result = 'nothing passed to me' + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + object_detector = ObjectDetector(**config) + output = _OutPipeElement(sample_callback=sample_callback) + object_detector.connect_to_next_element(output) + # bad sample + object_detector.receive_next_sample(image=None) + assert result == 'nothing passed to me' + + # good sample + fall_detector = FallDetector(**config) + fall_detector.connect_to_next_element(output) + + # The frame represents a person who is in a standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # The frame represents a person falls. + img_2 = _get_image(file_name='fall_img_2.png') + + fall_detector.receive_next_sample(image=img_1) + fall_detector.min_time_between_frames = 0.01 + time.sleep(fall_detector.min_time_between_frames) + fall_detector.receive_next_sample(image=img_2) + + assert result + assert len(result) == 1 + + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.7 + assert angle >= 60 + + +def test_draw_line_0(): + """No body lines passed to draw. No image should be saved.""" + config = _fall_detect_config() + + fall_detector = FallDetector(**config) + + image = _get_image(file_name='fall_img_1.png') + pose_dix = None + lines_drawn = fall_detector.draw_lines(image, pose_dix, 0.5) + assert lines_drawn == 0 + + pose_dix = {} + lines_drawn = fall_detector.draw_lines(image, pose_dix, 0.5) + assert lines_drawn == 0 + + +def test_draw_line_1(): + """One body line passed to draw. Image with one line should be saved.""" + config = _fall_detect_config() + + fall_detector = FallDetector(**config) + + image = _get_image(file_name='fall_img_1.png') + pose_dix = {fall_detector.LEFT_SHOULDER: [0, 0], + fall_detector.LEFT_HIP: [0, 1]} + lines_drawn = fall_detector.draw_lines(image, pose_dix, 0.5) + assert lines_drawn == 1 + + +def test_draw_line_1_1(): + """One keypoing but no full body line. No image should be saved.""" + config = _fall_detect_config() + + fall_detector = FallDetector(**config) + + image = _get_image(file_name='fall_img_1.png') + pose_dix = {fall_detector.LEFT_SHOULDER: [0, 0]} + lines_drawn = fall_detector.draw_lines(image, pose_dix, 0.5) + assert lines_drawn == 0 + + +def test_draw_line_2(): + """Two body lines passed to draw. Image with two lines should be saved.""" + config = _fall_detect_config() + + fall_detector = FallDetector(**config) + + # The frame represents a person who is in a standing position. + image = _get_image(file_name='fall_img_1.png') + pose_dix = {fall_detector.LEFT_SHOULDER: [0, 0], + fall_detector.LEFT_HIP: [0, 1], + fall_detector.RIGHT_SHOULDER: [1, 0], + fall_detector.RIGHT_HIP: [1, 1]} + lines_drawn = fall_detector.draw_lines(image, pose_dix, 0.5) + assert lines_drawn == 2 + + +def test_fall_detection_2_frame_back_case_1(): + """ + Expected to detect a fall using frame[t] and frame[t-1]. + frame[t-2] : A person is in standing position. + frame[t-1] : A person is almost in standing position as he is walking. + frame[t] : A person is fall down. + """ + + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + + # A frame at t-2 timestamp when person is in standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # A frame at t-1 timestamp when person is almost in standing position \ + # as he is walking. + img_2 = _get_image(file_name='fall_img_1_1.png') + + # A frame at t timestamp when person falls down. + img_3 = _get_image(file_name='fall_img_2.png') + + fall_detector.min_time_between_frames = 0.01 + + fall_detector.receive_next_sample(image=img_1) + time.sleep(fall_detector.min_time_between_frames) + + fall_detector.receive_next_sample(image=img_2) + time.sleep(fall_detector.min_time_between_frames) + + assert not result + + fall_detector.receive_next_sample(image=img_3) + + assert result + assert len(result) == 1 + + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.7 + assert angle > 60 + + +def test_fall_detection_2_frame_back_case_2(): + """ + Expected to detect a fall using frame[t] and frame[t-2]. + frame[t-2] : A person is in standing position. + frame[t-1] : A person is mid-way of fall. + frame[t] : A person is fall down. + """ + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + + # A frame at t-2 timestamp when person is in standing position. + img_1 = _get_image(file_name='fall_img_1.png') + + # A frame at t-1 timestamp when person is mid-way of fall. + img_2 = _get_image(file_name='fall_img_2_2.png') + + # A frame at t timestamp when person falls down. + img_3 = _get_image(file_name='fall_img_2.png') + + fall_detector.min_time_between_frames = 0.01 + fall_detector.max_time_between_frames = 15 + + fall_detector.receive_next_sample(image=img_1) + time.sleep(fall_detector.min_time_between_frames) + + fall_detector.receive_next_sample(image=img_2) + time.sleep(fall_detector.min_time_between_frames) + + assert not result + + fall_detector.receive_next_sample(image=img_3) + + assert result + assert len(result) == 1 + + category = result[0]['label'] + confidence = result[0]['confidence'] + angle = result[0]['leaning_angle'] + keypoint_corr = result[0]['keypoint_corr'] + + assert keypoint_corr + assert category == 'FALL' + assert confidence > 0.7 + assert angle >= 60 + + +def test_fall_detection_2_frame_back_case_3(): + """ + Expected to not detect a fall using frame[t],frame[t-1] and frame[t-2]. + frame[t-2] : A person is in walking postion. + frame[t-1] : A person is in walking postion. + frame[t] : A person is slight in lean postion but no fall. + """ + + config = _fall_detect_config() + result = None + + def sample_callback(image=None, inference_result=None, **kwargs): + nonlocal result + result = inference_result + + fall_detector = FallDetector(**config) + + output = _OutPipeElement(sample_callback=sample_callback) + fall_detector.connect_to_next_element(output) + + # A frame at t-2 timestamp when person is in walking postion. + img_1 = _get_image(file_name='fall_img_15.png') + + # A frame at t-1 timestamp when person is in walking postion. + img_2 = _get_image(file_name='fall_img_16.png') + + # A frame at t timestamp when person is slight in lean postion but no fall. + img_3 = _get_image(file_name='fall_img_17.png') + + fall_detector.min_time_between_frames = 0.01 + + fall_detector.receive_next_sample(image=img_1) + time.sleep(fall_detector.min_time_between_frames) + + fall_detector.receive_next_sample(image=img_2) + time.sleep(fall_detector.min_time_between_frames) + + assert not result + + fall_detector.receive_next_sample(image=img_3) + + assert not result