Skip to content

This project builds a real-time object detection system using a Raspberry Pi and a camera. It captures live video, processes it with a TensorFlow Lite model to detect specific objects, and saves important events as video files.

License

Notifications You must be signed in to change notification settings

dafmontenegro/pi-tensorflow-lite-object-detection

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pi-tensorflow-lite-object-detection

This project builds a real-time object detection system using a Raspberry Pi and a camera. It captures live video, processes it with a TensorFlow Lite model to detect specific objects, and saves important events as video files. To handle these tasks efficiently, it uses parallel computing with threading. The system provides visual alerts through LEDs and sound alerts via a buzzer, showcasing its capability to control real-world actions, which can also be extended to manage circuits or send signals. A local network server is created for real-time monitoring, enabling users to view live video, access saved recordings, and review logs seamlessly.

  • Real-Time Object Detection: Detects objects in real-time using a TensorFlow Lite model.
  • Parallel Computing: Use threading for efficient processing and real-time performance.
  • LED and Buzzer Integration: Provides visual and auditory notifications for detected events and can be extended to control circuits or send signals.
  • Safe Zone Detection: Configures a virtual safe zone within the camera's field of view. When an object enters or leaves this predefined area, the system triggers alerts and records the event.
  • Event Recording: Records video segments when an object is detected.
  • Storage Management: Monitors and manages storage capacity, automatically deleting old events when necessary.
  • Web Interface: Creates a local network server to stream live video, view saved recordings, and access event logs. Users can monitor live footage, browse archived video segments, and view logs through the web interface.

An alternative name of this project is FLDSMDFR:

  • F: Flask
  • L: Lightweight
  • D: Detection
  • S: System
  • M: Machine Learning
  • D: Device
  • F: Fast
  • R: Recognition

Table Of Contents

Author

1. General Specifications

This repository presents a demonstration of real-time object detection on a Raspberry Pi using TensorFlow Lite. Inspired by the official TensorFlow examples library [1] and the video tutorials by Paul McWhorter [2], this project provides a hands-on exploration of object detection capabilities on resource-constrained devices like the Raspberry Pi platform.

1.1 Requirements

  • Python 3.x
  • OpenCV
  • Flask
  • TensorFlow Lite
    • Model: efficientdet_lite0.tflite

1.2 Software:

  • OS: Debian Bullseye

1.3 Hardware:

  • Board: Raspberry Pi 4 Model B Rev 1.2
  • Webcam: Logitech C270 HD

2. Usage

2.1 Run the System

Execute the script with the following command:

python pi_tensorflow_lite_object_detection.py --folder-name "events" --log-file "logfile.log"
  • --folder-name: Name of the folder to store events (default: "events").
  • --log-file: Name of the log file (default: "logfile.log").
  • --reset-events: Reset events folder.
  • --reset-logs: Reset log file.

2.2 Access the Web Interface

Open a web browser and navigate to http://<your_raspberry_pi_ip>/ or http://<your_hostname>/. The following endpoints are available:

  • /: Streams real-time video from the camera.
  • /logs/: Displays the log file.
  • /events/: Shows a list of recorded events.
  • /play/<path:video_path>: Plays a specific recorded video.

3. Demo

Based on Google AI for Developers [3], we will reference a section that provides further details about the model used in the project.

"The EfficientDet-Lite0 model uses an EfficientNet-Lite0 backbone with a 320x320 input size and BiFPN feature network. The model was trained with the COCO dataset, a large-scale object detection dataset that contains 1.5 million object instances and 80 object labels." [3]

What this means is that the project can recognize up to 80 different types of objects, so to better demonstrate this, videos were recorded featuring some of them. You can watch them for a better understanding:

3.1 Human detector

👤 Click here to be redirected to the video.

3.2 Banana detector

🍌 Click here to be redirected to the video

3.3 Cat detector

😸 Click here to be redirected to the video

4. Code

4.1 class LEDRGB and class LEDSRGB

class LEDRGB:
    colors = {
        "red": (1, 0, 0),
        "green": (0, 1, 0),
        "blue": (0, 0, 1),
        "yellow": (1, 1, 0),
        "magenta": (1, 0, 1),
        "cyan": (0, 1, 1),
        "white": (1, 1, 1),
        "off": (0, 0, 0)
    }

    def __init__(self, red_led, green_led, blue_led):
        GPIO.setmode(GPIO.BCM)
        self.red_led = red_led
        self.green_led = green_led
        self.blue_led = blue_led
        GPIO.setup(self.red_led, GPIO.OUT)
        GPIO.setup(self.green_led, GPIO.OUT)
        GPIO.setup(self.blue_led, GPIO.OUT)
    
    def _set_color(self, color_name):
        color = self.colors.get(color_name.lower(), self.colors["off"])
        GPIO.output(self.red_led, color[0])
        GPIO.output(self.green_led, color[1])
        GPIO.output(self.blue_led, color[2])

    def __getattr__(self, color_name):
        return lambda: self._set_color(color_name)

class LEDSRGB:
    def __init__(self, leds):
        self.leds = [LEDRGB(*led) for led in leds]
    
    def set_color(self, color_names):
        if isinstance(color_names, str):
            for i, led in enumerate(self.leds):
                getattr(led, color_names)()
        elif isinstance(color_names, list) and len(color_names) == len(self.leds):
            for (led, color) in zip(self.leds, color_names):
                getattr(led, color)()
    
    def __getattr__(self, color_name):
        return lambda: self.set_color(color_name)

4.2 class Buzzer

class Buzzer:
    def __init__(self, pin=12, frequency=5000, duty_cycle=50):
        self.pin = pin
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin, GPIO.OUT)
        self.frequency = frequency
        self.pwm = GPIO.PWM(self.pin, self.frequency)
        self.duty_cycle = duty_cycle
        self.active = False

    def auto_stop(self, cycles=3, duration=0.1):
        if not self.active:
            self.active = True
            for _ in range(cycles):
                self.start()
                time.sleep(duration)
                self.stop()
                time.sleep(duration)
            self.active = False
    
    def start(self):
        self.pwm.start(self.duty_cycle)
    
    def stop(self):
        self.pwm.stop()

4.3 class ObjectDetector and class Camera

class ObjectDetector:
    def __init__(self, model_name="efficientdet_lite0.tflite", num_threads=4, score_threshold=0.3, max_results=1, category_name_allowlist=["person"]):
        base_options = core.BaseOptions(file_name=model_name, use_coral=False, num_threads=num_threads)
        detection_options = processor.DetectionOptions(max_results=max_results, score_threshold=score_threshold, category_name_allowlist=category_name_allowlist)
        options = vision.ObjectDetectorOptions(base_options=base_options, detection_options=detection_options)
        self.detector = vision.ObjectDetector.create_from_options(options)

    def detections(self, image):
        rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return self.detector.detect(vision.TensorImage.create_from_array(rgb_image)).detections

class Camera:
    def __init__(self, frame_width=1280, frame_height=720, camera_number=0):
        self.video_capture = cv2.VideoCapture(camera_number)
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)

    def frame(self):
        _, frame = self.video_capture.read()
        return frame

4.4 class RealTimeObjectDetection

class RealTimeObjectDetection:
    def __init__(self, frame_width=1280, frame_height=720, camera_number=0, model_name="efficientdet_lite0.tflite", num_threads=4, score_threshold=0.3, max_results=1, category_name_allowlist=["person"], 
                 folder_name="events", storage_capacity=32, led_pines=[(13, 19, 26), (21, 20, 16)], pin_buzzer=12, frequency=5000, duty_cycle=50, fps_frame_count= 30, safe_zone=((0, 0), (1280, 720))):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.camera = Camera(frame_width, frame_height, camera_number)
        self.frame = self.camera.frame()
        self.object_detector = ObjectDetector(model_name, num_threads, score_threshold, max_results, category_name_allowlist)
        self.folder_name = folder_name
        self.storage_manager = StorageManager(folder_name, storage_capacity)
        self.storage_manager.supervise_folder_capacity()
        self.leds_rgb = LEDSRGB(led_pines)
        self.buzzer = Buzzer(pin_buzzer, frequency, duty_cycle)
        self.safe_zone_start, self.safe_zone_end = safe_zone
        self.fps_frame_count = fps_frame_count
        self.last_detection_timestamp = None
        self.frame_buffer = []
        self.frame_times = []
        self.output = {}
        self.events = 0
        self.fps = 24

    def guard(self, min_video_duration=1, max_video_duration=60, max_detection_delay=10, event_check_interval=10, safe_zone=False):
        try:
            self.buzzer.auto_stop()
            self.leds_rgb.set_color(["off", "green"])
            while self.isOpened():
                security_breach, time_localtime = self.process_frame((0, 0, 255), 1, 2, cv2.FONT_HERSHEY_SIMPLEX, safe_zone)
                if security_breach:
                    if not self.frame_buffer:
                        self.output["file_name"] = time.strftime("%B%d_%Hhr_%Mmin%Ssec", time_localtime)
                        self.output["day"], self.output["hours"], self.output["mins"] = self.output["file_name"].split("_")
                        self.output["path"] = os.path.join(self.folder_name, self.output["day"], self.output["hours"], f"{self.output['file_name']}.mp4")
                    elif len(self.frame_buffer) == int(self.fps):
                        buzzer_thread = threading.Thread(target=self.buzzer.auto_stop)
                        buzzer_thread.start()
                        self.leds_rgb.red()
                    self.last_detection_timestamp = time.time()
                    self.frame_buffer.append(self.frame)
                else:
                    if self.last_detection_timestamp and ((time.time() - self.last_detection_timestamp) >= max_detection_delay):
                        if len(self.frame_buffer) >= self.fps*min_video_duration:
                            self.save_frame_buffer(self.output["path"], event_check_interval)
                        self.leds_rgb.set_color(["off", "green"])
                        self.last_detection_timestamp = None
                        self.frame_buffer = []
                        self.output = {}
                    elif len(self.frame_buffer) >= self.fps*max_video_duration:
                        self.save_frame_buffer(self.output["path"], event_check_interval)
        except Exception as e:
            logging.error(e, exc_info=True)
            GPIO.cleanup()
            self.close()
            os._exit(0)
    
    def save_frame_buffer(self, path, event_check_interval=10):
        output_seconds = int(len(self.frame_buffer)/self.fps)
        os.makedirs(os.path.dirname(path), exist_ok=True)
        out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"avc1"), self.fps, (self.frame_width, self.frame_height))
        logging.warning(f"EVENT: {output_seconds} seconds {path}")
        for frame in self.frame_buffer:
            out.write(frame)
        out.release()
        self.events += 1
        if self.events % event_check_interval == 0:
            storage_thread = threading.Thread(target=self.storage_manager.supervise_folder_capacity)
            storage_thread.start()
    
    def _safe_zone_invasion(self, rect_start, rect_end):
        if self.safe_zone_start[0] > rect_end[0] or self.safe_zone_end[0] < rect_start[0]:
            return False
        if self.safe_zone_start[1] > rect_end[1] or self.safe_zone_end[1] < rect_start[1]:
            return False
        return True

    def process_frame(self, color=(0, 0, 255), font_size=1, font_thickness=2, font=cv2.FONT_HERSHEY_SIMPLEX, safe_zone=False):
        security_breach = False
        start_time = time.time()
        frame = self.camera.frame()
        time_localtime = time.localtime()
        detections = self.object_detector.detections(frame)
        for detection in detections:
            box = detection.bounding_box
            rect_start = (box.origin_x, box.origin_y)
            rect_end = (box.origin_x+box.width, box.origin_y+box.height)
            category_name = detection.categories[0].category_name
            text_position = (7+box.origin_x, 21+box.origin_y)
            cv2.putText(frame, category_name, text_position, font, font_size, color, font_thickness)
            cv2.rectangle(frame, rect_start, rect_end, color, font_thickness)
            security_breach = self._safe_zone_invasion(rect_start, rect_end)
        cv2.putText(frame, time.strftime("%B%d/%Y %H:%M:%S", time_localtime), (21, 42), cv2.FONT_HERSHEY_SIMPLEX, font_size, color, font_thickness)
        if safe_zone:
            cv2.rectangle(frame, self.safe_zone_start, self.safe_zone_end, (0, 255, 255), font_thickness)
        self.frame = frame
        self.frame_times.append(time.time() - start_time)
        if self.fps_frame_count == len(self.frame_times):
            average_frame_time = sum(self.frame_times) / len(self.frame_times)
            self.fps = round(1/average_frame_time, 2)
            self.frame_times = []
        return security_breach, time_localtime

    def isOpened(self):
        return self.camera.video_capture.isOpened()
    
    def close(self):
        self.camera.video_capture.release()

4.5 class StorageManager

class StorageManager:
    def __init__(self, events_folder="events", storage_capacity=32):
        self.events_folder = events_folder
        self.storage_capacity = storage_capacity

    @staticmethod
    def folder_size_gb(folder_path):
        total_size_bytes = 0
        for dirpath, _, filenames in os.walk(folder_path):
            for filename in filenames:
                file_path = os.path.join(dirpath, filename)
                total_size_bytes += os.path.getsize(file_path)
        return total_size_bytes / (1024 ** 3)
    
    @staticmethod
    def delete_folder(folder_path):
        folder_size = StorageManager.folder_size_gb(folder_path)
        shutil.rmtree(folder_path)
        logging.warning(f"STORAGE: '{folder_path}' was deleted (-{folder_size:.4f} GB)")
        return folder_size

    def supervise_folder_capacity(self):
        events_folder_size = StorageManager.folder_size_gb(self.events_folder)
        logging.info(f"STORAGE: '{self.events_folder}' is ({events_folder_size:.4f} GB)")
        while events_folder_size > self.storage_capacity:
            folder_to_delete = os.path.join(self.events_folder, min(os.listdir(self.events_folder)))
            events_folder_size -= StorageManager.delete_folder(folder_to_delete)

4.6 Initial Configuration

remote_camera = RealTimeObjectDetection(
    frame_width=1280,
    frame_height=720,
    camera_number=0,
    model_name="efficientdet_lite0.tflite",
    num_threads=4,
    score_threshold=0.5,
    max_results=3, 
    category_name_allowlist=["person", "dog", "cat", "umbrella"],
    folder_name=folder_name,
    storage_capacity=32,
    led_pines=[(13, 19, 26), (16, 20, 21)],
    pin_buzzer=12,
    frequency=5000,
    duty_cycle=50,
    fps_frame_count=30,
    safe_zone=((0, 180), (1280, 720))
)

guard_thread = threading.Thread(target=remote_camera.guard, kwargs={
    "min_video_duration": 1,
    "max_video_duration": 60,
    "max_detection_delay": 10,
    "event_check_interval": 10,
    "safe_zone": True
})

guard_thread.start()

5. Contributing

Contributions to FLDSMDFR (the alternative name for the project) are welcome from developers, researchers, and enthusiasts interested in real-time object detection, parallel computing, and Raspberry Pi applications. We encourage collaborative efforts to enhance the system's features, optimize performance, and improve usability.

6. References

[1] Tensorflow. (s. f.). examples/lite/examples/object_detection/raspberry_pi at master · tensorflow/examples. GitHub. https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi

[2] Paul McWhorter. (2023, 25 mayo). Raspberry Pi LESSON 63: Object Detection on Raspberry Pi Using Tensorflow Lite [Vídeo]. YouTube. https://www.youtube.com/watch?v=yE7Ve3U5Slw

[3] Object detection task guide. (s. f.). Google AI For Developers. https://ai.google.dev/edge/mediapipe/solutions/vision/object_detector#configurations_options

About

This project builds a real-time object detection system using a Raspberry Pi and a camera. It captures live video, processes it with a TensorFlow Lite model to detect specific objects, and saves important events as video files.

Topics

Resources

License

Stars

Watchers

Forks

Languages