-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
121 lines (95 loc) · 4.45 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from flask import Flask, render_template, request, url_for
import os
import cv2
from ultralytics import YOLO
import supervision as sv
import pyresearch
# Initialize Flask app
app = Flask(__name__)
# Load YOLO model
model = YOLO("last.pt")
# Set upload folder and allowed extensions
UPLOAD_FOLDER = 'uploads'
OUTPUT_FOLDER = 'static/outputs'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'mp4', 'avi', 'mov'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['OUTPUT_FOLDER'] = OUTPUT_FOLDER
# Function to check allowed file types
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Image processing function
def process_image(input_image_path: str, output_image_path: str):
image = cv2.imread(input_image_path)
if image is None:
print("Error: Unable to read the image.")
return
# Resize the image
resized = cv2.resize(image, (640, 640))
# Perform detection
detections = sv.Detections.from_ultralytics(model(resized)[0])
# Annotate the image
annotated = sv.BoundingBoxAnnotator().annotate(scene=resized, detections=detections)
annotated = sv.LabelAnnotator().annotate(scene=annotated, detections=detections)
# Save the annotated image
cv2.imwrite(output_image_path, annotated)
print(f"Processed and saved: {output_image_path}")
# Video processing function
def process_video(input_video_path: str, output_video_path: str):
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print("Error: Unable to open the video.")
return
# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
# Define codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
while True:
ret, frame = cap.read()
if not ret:
break
# Resize the frame
resized = cv2.resize(frame, (640, 640))
# Perform detection
detections = sv.Detections.from_ultralytics(model(resized)[0])
# Annotate the frame
annotated = sv.BoundingBoxAnnotator().annotate(scene=resized, detections=detections)
annotated = sv.LabelAnnotator().annotate(scene=annotated, detections=detections)
# Write the processed frame to the output video
out.write(annotated)
cap.release()
out.release()
print(f"Processed and saved video: {output_video_path}")
# Route to handle uploads
@app.route('/', methods=['GET', 'POST'])
def upload_files():
processed_files = [] # List to store URLs of processed files
if request.method == 'POST':
files = request.files.getlist('files') # Get multiple files from the form
for file in files:
if file and allowed_file(file.filename):
# Save the uploaded file
filename = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(filename)
# Check if the file is an image or video
file_extension = file.filename.rsplit('.', 1)[1].lower()
if file_extension in {'png', 'jpg', 'jpeg'}:
# Define output image path
output_filename = os.path.join(app.config['OUTPUT_FOLDER'], 'annotated_' + file.filename)
process_image(filename, output_filename)
processed_file_url = url_for('static', filename=f'outputs/annotated_{file.filename}')
elif file_extension in {'mp4', 'avi', 'mov'}:
# Define output video path
output_filename = os.path.join(app.config['OUTPUT_FOLDER'], 'annotated_' + file.filename)
process_video(filename, output_filename)
processed_file_url = url_for('static', filename=f'outputs/annotated_{file.filename}')
# Add the processed file URL to the list
processed_files.append(processed_file_url)
return render_template('index.html', processed_files=processed_files)
if __name__ == "__main__":
# Create upload and output folders if they don't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
app.run(debug=True)