import os import sys import subprocess import time import cv2 import numpy as np from pathlib import Path from datetime import datetime, timedelta # Try to import YOLO try: from ultralytics import YOLO except ImportError: print("Error: 'ultralytics' library not found. Please install it using: pip install ultralytics") sys.exit(1) class VideoCleaner: def __init__(self, model_path='yolo26n.pt', brightness_threshold=25, age_days=30): print(f"Initializing YOLO model: {model_path}...") try: self.model = YOLO(model_path) except Exception as e: print(f"Failed to load YOLO model: {e}") sys.exit(1) self.brightness_threshold = brightness_threshold self.age_limit = timedelta(days=age_days) self.supported_extensions = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv') def is_frame_color_and_bright(self, frame): """Checks if a single frame is in 'Day/Lights-on' mode.""" # 1. Brightness check gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if np.mean(gray) < self.brightness_threshold: return False # 2. Color (Saturation) check hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) sat_95th = np.percentile(hsv[:, :, 1], 95) if sat_95th < 20: # Infrared/Grayscale mode return False return True def should_keep_video(self, video_path): """ Scans the video to see if it should be kept. Kept if: (It's not a night-only video) AND (Human is detected). """ cap = cv2.VideoCapture(str(video_path)) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if fps <= 0 or frame_count <= 0: cap.release() return False, "Invalid video" step = max(1, int(fps * 2)) print(f"Analyzing {video_path.name}...") has_shown_color = False prev_gray = None for i in range(0, frame_count, step): cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if not ret: break # Check if this frame is 'Normal Mode' (Lights on) is_color = self.is_frame_color_and_bright(frame) if is_color: has_shown_color = True # --- MOTION DETECTION --- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (21, 21), 0) if prev_gray is not None: frame_delta = cv2.absdiff(prev_gray, gray) thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] if (np.sum(thresh) / thresh.size) < 0.005: continue prev_gray = gray # --- AI HUMAN DETECTION --- # We only care about humans if the lights are on (as per your rules) # OR we can scan anyway if you want to find people in IR mode too. # Following your rules: "If it's night and no lights, delete". # So we only need to detect humans to justify keeping the video. small_frame = cv2.resize(frame, (640, 360)) results = self.model(small_frame, classes=[0], verbose=False, conf=0.7) debugmode=True if len(results[0].boxes) > 0: if is_color: # --- DEBUG: Save the frame that triggered detection --- if debugmode : debug_dir = Path("debug_detections") debug_dir.mkdir(exist_ok=True) debug_filename = debug_dir / f"detected_{video_path.stem}_frame_{i}.jpg" # Draw boxes on the frame for visualization annotated_frame = results[0].plot() cv2.imwrite(str(debug_filename), annotated_frame) print(f"DEBUG: Saved detection image to {debug_filename}") cap.release() return True, "Human detected in color mode" cap.release() # Final decision after scanning the whole video: if has_shown_color: # If lights were turned on but we found no people during the whole video # Based on your rule #2: "If no humans in the whole video, delete." return False, "Lights were on, but no humans detected" return False, "Entirely night/IR mode or no humans" def process_video_file(self, video_path, processed_base_dir, input_base_dir): video_path = Path(video_path).resolve() mtime = datetime.fromtimestamp(os.path.getmtime(video_path)) if datetime.now() - mtime < self.age_limit: return False # 表示因为时间太新没处理 keep, reason = self.should_keep_video(video_path) if keep: print(f"Action: KEEP {video_path.name} - Reason: {reason}") self.move_to_processed(video_path, processed_base_dir, input_base_dir) else: print(f"Action: DELETE {video_path.name} - Reason: {reason}") os.remove(video_path) return True # 表示处理了 def move_to_processed(self, video_path, processed_base_dir, input_base_dir): rel_path = video_path.relative_to(input_base_dir) output_path = processed_base_dir / rel_path output_path.parent.mkdir(parents=True, exist_ok=True) if os.path.exists(output_path): os.remove(output_path) os.rename(video_path, output_path) def scan_and_process(self, input_dir): input_path = Path(input_dir).resolve() if not input_path.exists(): return processed_dir = input_path.parent / "processed" print(f"Scanning {input_path}...") while True: all_videos = [] # 每次循环都获取最新列表 for root, dirs, files in os.walk(input_path): if "processed" in dirs: dirs.remove("processed") for file in files: if file.lower().endswith(self.supported_extensions): full_path = Path(root) / file all_videos.append(full_path) if not all_videos: print("No more files to process. Exiting.") break # 按修改时间排序,取最旧的一个 all_videos.sort(key=lambda x: os.path.getmtime(x)) processed_in_this_loop = False for target_file in all_videos: if self.process_video_file(target_file, processed_dir, input_path): processed_in_this_loop = True break # 处理了一个,重新获取列表 if not processed_in_this_loop: # 如果列表里剩下的所有文件都还没到 30 天,那就退出 print("All remaining files are newer than the age limit. Exiting.") break if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("dir", help="Target directory") parser.add_argument("--days", type=int, default=30) parser.add_argument("--model", type=str, default='yolo26n.pt', help="Path to YOLO model or model name") args = parser.parse_args() cleaner = VideoCleaner(model_path=args.model, age_days=args.days) cleaner.scan_and_process(args.dir)