Files
videocut/video_cleaner.py
2026-01-31 16:37:31 +08:00

185 lines
7.3 KiB
Python

import os
import sys
import subprocess
import time
import cv2
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor, as_completed
import torch
# Try to import YOLO
try:
from ultralytics import YOLO
except ImportError:
print("Error: 'ultralytics' library not found. Please install it using: pip install ultralytics")
sys.exit(1)
class VideoCleaner:
def __init__(self, model_path='yolo26n.pt', brightness_threshold=25, age_days=30, workers=4):
self.model_path = model_path
self.brightness_threshold = brightness_threshold
self.age_limit = timedelta(days=age_days)
self.workers = workers
self.supported_extensions = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv')
# We'll initialize the model inside each process for thread/process safety
self.model = None
def _get_model(self):
"""Lazy initialization of the model for each process."""
if self.model is None:
# print(f"[{os.getpid()}] Initializing YOLO model: {self.model_path}...")
try:
self.model = YOLO(self.model_path)
# Ensure it's on GPU if available
if torch.cuda.is_available():
self.model.to('cuda')
except Exception as e:
print(f"Failed to load YOLO model in process {os.getpid()}: {e}")
sys.exit(1)
return self.model
def is_frame_color_and_bright(self, frame):
"""Checks if a single frame is in 'Day/Lights-on' mode."""
# 1. Brightness check
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if np.mean(gray) < self.brightness_threshold:
return False
# 2. Color (Saturation) check
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
sat_95th = np.percentile(hsv[:, :, 1], 95)
if sat_95th < 20: # Infrared/Grayscale mode
return False
return True
def should_keep_video(self, video_path, save_preview_path=None):
"""
Scans the video using YOLO's native stream processing.
"""
print(f"[{os.getpid()}] Analyzing: {video_path.name}", flush=True)
model = self._get_model()
# Use YOLO's generator to handle video decoding efficiently
# vid_stride=int(fps*2) could be used but we need consistent logic
# We'll use a larger stride to skip frames like before
results_gen = model.predict(
source=str(video_path),
classes=[0],
conf=0.7,
half=True,
imgsz=640,
stream=True, # This makes it a generator
verbose=False,
vid_stride=30 # Roughly 1-2 seconds depending on fps
)
has_shown_color = False
for i, result in enumerate(results_gen):
frame = result.orig_img # Get the original frame for our checks
# 1. Check if this frame is 'Normal Mode' (Lights on)
is_color = self.is_frame_color_and_bright(frame)
if is_color:
has_shown_color = True
# 2. If YOLO detected someone
if len(result.boxes) > 0:
if is_color:
if save_preview_path:
save_preview_path.parent.mkdir(parents=True, exist_ok=True)
# Save annotated frame
annotated_frame = result.plot()
cv2.imwrite(str(save_preview_path), annotated_frame)
return True, "Human detected in color mode"
# Final decision
if has_shown_color:
return False, "Lights were on, but no humans detected"
return False, "Entirely night/IR mode or no humans"
def process_video_file(self, video_path, processed_base_dir, input_base_dir):
video_path = Path(video_path).resolve()
mtime = datetime.fromtimestamp(os.path.getmtime(video_path))
if datetime.now() - mtime < self.age_limit:
return False # 表示因为时间太新没处理
# Calculate target path for preview image
rel_path = video_path.relative_to(input_base_dir)
output_path = processed_base_dir / rel_path
preview_path = output_path.with_suffix('.jpg')
keep, reason = self.should_keep_video(video_path, save_preview_path=preview_path)
if keep:
print(f"[{os.getpid()}] Action: KEEP {video_path.name} - Reason: {reason}", flush=True)
self.move_to_processed(video_path, processed_base_dir, input_base_dir)
else:
print(f"[{os.getpid()}] Action: DELETE {video_path.name} - Reason: {reason}", flush=True)
os.remove(video_path)
return True # 表示处理了
def move_to_processed(self, video_path, processed_base_dir, input_base_dir):
rel_path = video_path.relative_to(input_base_dir)
output_path = processed_base_dir / rel_path
output_path.parent.mkdir(parents=True, exist_ok=True)
if os.path.exists(output_path): os.remove(output_path)
os.rename(video_path, output_path)
def scan_and_process(self, input_dir):
input_path = Path(input_dir).resolve()
if not input_path.exists(): return
processed_dir = input_path.parent / "processed"
print(f"Scanning {input_path}...")
all_videos = []
for root, dirs, files in os.walk(input_path):
if "processed" in dirs:
dirs.remove("processed")
for file in files:
if file.lower().endswith(self.supported_extensions):
all_videos.append(Path(root) / file)
if not all_videos:
print("No videos found.")
return
# Sort by mtime
all_videos.sort(key=lambda x: os.path.getmtime(x))
print(f"Found {len(all_videos)} videos. Starting parallel processing with {self.workers} workers...")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
# We pass the method and its arguments.
# Note: self will be pickled. Since self.model is None, it should work.
futures = [executor.submit(self.process_video_file, v, processed_dir, input_path) for v in all_videos]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error in worker process: {e}")
if __name__ == "__main__":
import argparse
# 设置多进程启动方法为 'spawn',这在 CUDA 环境下更安全,能避免显存冲突
import multiprocessing
try:
multiprocessing.set_start_method('spawn', force=True)
except RuntimeError:
pass
parser = argparse.ArgumentParser()
parser.add_argument("dir", help="Target directory")
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--model", type=str, default='yolo26n.pt', help="Path to YOLO model or model name")
parser.add_argument("--workers", type=int, default=4, help="Number of parallel workers")
args = parser.parse_args()
cleaner = VideoCleaner(model_path=args.model, age_days=args.days, workers=args.workers)
cleaner.scan_and_process(args.dir)