Files
videocut/video_cleaner.py
2026-01-31 16:00:38 +08:00

207 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import subprocess
import time
import cv2
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
# Try to import YOLO
try:
from ultralytics import YOLO
except ImportError:
print("Error: 'ultralytics' library not found. Please install it using: pip install ultralytics")
sys.exit(1)
class VideoCleaner:
def __init__(self, model_path='yolo26n.pt', brightness_threshold=25, age_days=30, workers=4):
self.model_path = model_path
self.brightness_threshold = brightness_threshold
self.age_limit = timedelta(days=age_days)
self.workers = workers
self.supported_extensions = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv')
# We'll initialize the model inside each process for thread/process safety
self.model = None
def _get_model(self):
"""Lazy initialization of the model for each process."""
if self.model is None:
# print(f"[{os.getpid()}] Initializing YOLO model: {self.model_path}...")
try:
self.model = YOLO(self.model_path)
# Ensure it's on GPU if available
if torch.cuda.is_available():
self.model.to('cuda')
except Exception as e:
print(f"Failed to load YOLO model in process {os.getpid()}: {e}")
sys.exit(1)
return self.model
def is_frame_color_and_bright(self, frame):
"""Checks if a single frame is in 'Day/Lights-on' mode."""
# 1. Brightness check
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if np.mean(gray) < self.brightness_threshold:
return False
# 2. Color (Saturation) check
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
sat_95th = np.percentile(hsv[:, :, 1], 95)
if sat_95th < 20: # Infrared/Grayscale mode
return False
return True
def should_keep_video(self, video_path, save_preview_path=None):
"""
Scans the video to see if it should be kept.
Kept if: (It's not a night-only video) AND (Human is detected).
"""
cap = cv2.VideoCapture(str(video_path))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0 or frame_count <= 0:
cap.release()
return False, "Invalid video"
step = max(1, int(fps * 2))
print(f"Analyzing {video_path.name}...")
has_shown_color = False
prev_gray = None
for i in range(0, frame_count, step):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if not ret:
break
# Check if this frame is 'Normal Mode' (Lights on)
is_color = self.is_frame_color_and_bright(frame)
if is_color:
has_shown_color = True
# --- MOTION DETECTION ---
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
if prev_gray is not None:
frame_delta = cv2.absdiff(prev_gray, gray)
thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
if (np.sum(thresh) / thresh.size) < 0.005:
continue
prev_gray = gray
# --- AI HUMAN DETECTION ---
model = self._get_model()
# 直接传原图,省去 CPU resize开启半精度和指定推理尺寸
results = model(frame, classes=[0], verbose=False, conf=0.7, half=True, imgsz=640)
debugmode=True
if len(results[0].boxes) > 0:
if is_color:
# --- DEBUG: Save the frame that triggered detection ---
if debugmode or save_preview_path:
# Draw boxes on the frame for visualization
annotated_frame = results[0].plot()
if debugmode:
debug_dir = Path("debug_detections")
debug_dir.mkdir(exist_ok=True)
debug_filename = debug_dir / f"detected_{video_path.stem}_frame_{i}.jpg"
cv2.imwrite(str(debug_filename), annotated_frame)
print(f"DEBUG: Saved detection image to {debug_filename}")
if save_preview_path:
save_preview_path.parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(save_preview_path), annotated_frame)
print(f"INFO: Saved preview image to {save_preview_path}")
cap.release()
return True, "Human detected in color mode"
cap.release()
# Final decision after scanning the whole video:
if has_shown_color:
# If lights were turned on but we found no people during the whole video
# Based on your rule #2: "If no humans in the whole video, delete."
return False, "Lights were on, but no humans detected"
return False, "Entirely night/IR mode or no humans"
def process_video_file(self, video_path, processed_base_dir, input_base_dir):
video_path = Path(video_path).resolve()
mtime = datetime.fromtimestamp(os.path.getmtime(video_path))
if datetime.now() - mtime < self.age_limit:
return False # 表示因为时间太新没处理
# Calculate target path for preview image
rel_path = video_path.relative_to(input_base_dir)
output_path = processed_base_dir / rel_path
preview_path = output_path.with_suffix('.jpg')
keep, reason = self.should_keep_video(video_path, save_preview_path=preview_path)
if keep:
print(f"Action: KEEP {video_path.name} - Reason: {reason}")
self.move_to_processed(video_path, processed_base_dir, input_base_dir)
else:
print(f"Action: DELETE {video_path.name} - Reason: {reason}")
os.remove(video_path)
return True # 表示处理了
def move_to_processed(self, video_path, processed_base_dir, input_base_dir):
rel_path = video_path.relative_to(input_base_dir)
output_path = processed_base_dir / rel_path
output_path.parent.mkdir(parents=True, exist_ok=True)
if os.path.exists(output_path): os.remove(output_path)
os.rename(video_path, output_path)
def scan_and_process(self, input_dir):
input_path = Path(input_dir).resolve()
if not input_path.exists(): return
processed_dir = input_path.parent / "processed"
print(f"Scanning {input_path}...")
all_videos = []
for root, dirs, files in os.walk(input_path):
if "processed" in dirs:
dirs.remove("processed")
for file in files:
if file.lower().endswith(self.supported_extensions):
all_videos.append(Path(root) / file)
if not all_videos:
print("No videos found.")
return
# Sort by mtime
all_videos.sort(key=lambda x: os.path.getmtime(x))
print(f"Found {len(all_videos)} videos. Starting parallel processing with {self.workers} workers...")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
# We pass the method and its arguments.
# Note: self will be pickled. Since self.model is None, it should work.
futures = [executor.submit(self.process_video_file, v, processed_dir, input_path) for v in all_videos]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error in worker process: {e}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("dir", help="Target directory")
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--model", type=str, default='yolo26n.pt', help="Path to YOLO model or model name")
parser.add_argument("--workers", type=int, default=4, help="Number of parallel workers")
args = parser.parse_args()
cleaner = VideoCleaner(model_path=args.model, age_days=args.days, workers=args.workers)
cleaner.scan_and_process(args.dir)