import os import sys import subprocess import time import cv2 import numpy as np from pathlib import Path from datetime import datetime, timedelta from concurrent.futures import ProcessPoolExecutor, as_completed import torch # Try to import YOLO try: from ultralytics import YOLO except ImportError: print("Error: 'ultralytics' library not found. Please install it using: pip install ultralytics") sys.exit(1) class VideoCleaner: def __init__(self, model_path='yolo26n.pt', brightness_threshold=25, age_days=30, workers=4): self.model_path = model_path self.brightness_threshold = brightness_threshold self.age_limit = timedelta(days=age_days) self.workers = workers self.supported_extensions = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv') # We'll initialize the model inside each process for thread/process safety self.model = None def _get_model(self): """Lazy initialization of the model for each process.""" if self.model is None: # print(f"[{os.getpid()}] Initializing YOLO model: {self.model_path}...") try: self.model = YOLO(self.model_path) # Ensure it's on GPU if available if torch.cuda.is_available(): self.model.to('cuda') except Exception as e: print(f"Failed to load YOLO model in process {os.getpid()}: {e}") sys.exit(1) return self.model def is_frame_color_and_bright(self, frame): """Checks if a single frame is in 'Day/Lights-on' mode.""" # 1. Brightness check gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if np.mean(gray) < self.brightness_threshold: return False # 2. Color (Saturation) check hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) sat_95th = np.percentile(hsv[:, :, 1], 95) if sat_95th < 20: # Infrared/Grayscale mode return False return True def should_keep_video(self, video_path, save_preview_path=None): """ Scans the video using YOLO's native stream processing. """ model = self._get_model() # Use YOLO's generator to handle video decoding efficiently # vid_stride=int(fps*2) could be used but we need consistent logic # We'll use a larger stride to skip frames like before results_gen = model.predict( source=str(video_path), classes=[0], conf=0.7, half=True, imgsz=640, stream=True, # This makes it a generator verbose=False, vid_stride=30 # Roughly 1-2 seconds depending on fps ) has_shown_color = False for i, result in enumerate(results_gen): frame = result.orig_img # Get the original frame for our checks # 1. Check if this frame is 'Normal Mode' (Lights on) is_color = self.is_frame_color_and_bright(frame) if is_color: has_shown_color = True # 2. If YOLO detected someone if len(result.boxes) > 0: if is_color: if save_preview_path: save_preview_path.parent.mkdir(parents=True, exist_ok=True) # Save annotated frame annotated_frame = result.plot() cv2.imwrite(str(save_preview_path), annotated_frame) return True, "Human detected in color mode" # Final decision if has_shown_color: return False, "Lights were on, but no humans detected" return False, "Entirely night/IR mode or no humans" def process_video_file(self, video_path, processed_base_dir, input_base_dir): video_path = Path(video_path).resolve() mtime = datetime.fromtimestamp(os.path.getmtime(video_path)) if datetime.now() - mtime < self.age_limit: return False # 表示因为时间太新没处理 # Calculate target path for preview image rel_path = video_path.relative_to(input_base_dir) output_path = processed_base_dir / rel_path preview_path = output_path.with_suffix('.jpg') keep, reason = self.should_keep_video(video_path, save_preview_path=preview_path) if keep: print(f"Action: KEEP {video_path.name} - Reason: {reason}") self.move_to_processed(video_path, processed_base_dir, input_base_dir) else: print(f"Action: DELETE {video_path.name} - Reason: {reason}") os.remove(video_path) return True # 表示处理了 def move_to_processed(self, video_path, processed_base_dir, input_base_dir): rel_path = video_path.relative_to(input_base_dir) output_path = processed_base_dir / rel_path output_path.parent.mkdir(parents=True, exist_ok=True) if os.path.exists(output_path): os.remove(output_path) os.rename(video_path, output_path) def scan_and_process(self, input_dir): input_path = Path(input_dir).resolve() if not input_path.exists(): return processed_dir = input_path.parent / "processed" print(f"Scanning {input_path}...") all_videos = [] for root, dirs, files in os.walk(input_path): if "processed" in dirs: dirs.remove("processed") for file in files: if file.lower().endswith(self.supported_extensions): all_videos.append(Path(root) / file) if not all_videos: print("No videos found.") return # Sort by mtime all_videos.sort(key=lambda x: os.path.getmtime(x)) print(f"Found {len(all_videos)} videos. Starting parallel processing with {self.workers} workers...") with ProcessPoolExecutor(max_workers=self.workers) as executor: # We pass the method and its arguments. # Note: self will be pickled. Since self.model is None, it should work. futures = [executor.submit(self.process_video_file, v, processed_dir, input_path) for v in all_videos] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Error in worker process: {e}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("dir", help="Target directory") parser.add_argument("--days", type=int, default=30) parser.add_argument("--model", type=str, default='yolo26n.pt', help="Path to YOLO model or model name") parser.add_argument("--workers", type=int, default=4, help="Number of parallel workers") args = parser.parse_args() cleaner = VideoCleaner(model_path=args.model, age_days=args.days, workers=args.workers) cleaner.scan_and_process(args.dir)