import os import re import sys import time import random import sqlite3 import datetime import shutil import threading from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager from config import TOTAL_START_ID, THREAD_COUNT, STEP, ACTIVE_THREADS BASE_DIR = os.path.dirname(os.path.abspath(__file__)) def get_driver(): chrome_options = Options() chrome_options.page_load_strategy = 'eager' chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # 禁用图片、CSS和字体以加快加载速度 prefs = { "profile.managed_default_content_settings.images": 2, "profile.managed_default_content_settings.stylesheets": 2, "profile.managed_default_content_settings.fonts": 2 } chrome_options.add_experimental_option("prefs", prefs) chrome_options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") # 优先使用本地 chromedriver:环境变量 CHROMEDRIVER_PATH 或 PATH 中的 chromedriver chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') or shutil.which('chromedriver') if chromedriver_path: try: print(f"Using local chromedriver: {chromedriver_path}") service = Service(chromedriver_path) except Exception as e: print(f"Local chromedriver at {chromedriver_path} failed to start: {e}\nFalling back to webdriver_manager...") service = Service(ChromeDriverManager().install()) else: print("Local chromedriver not found; downloading via webdriver_manager...") service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) return driver def init_thread_db(db_path): conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS softinfo ( softid INTEGER PRIMARY KEY, softname TEXT, softdate TEXT, createtime TEXT ) ''') conn.commit() return conn def get_last_processed_id(conn, start_id, end_id): cursor = conn.cursor() # 查找该线程负责范围内的最大 ID cursor.execute('SELECT MAX(softid) FROM softinfo WHERE softid >= ? AND softid <= ?', (start_id, end_id)) result = cursor.fetchone() if result and result[0] is not None: return result[0] return start_id - 1 def worker(thread_idx, start_id, end_id): db_name = f'softlist_{thread_idx}.db' db_path = os.path.join(BASE_DIR, db_name) print(f"Thread-{thread_idx} started: range [{start_id}, {end_id}], DB: {db_name}") # 初始化自己的数据库 conn = init_thread_db(db_path) current_id = get_last_processed_id(conn, start_id, end_id) + 1 if current_id > end_id: print(f"Thread-{thread_idx} has already finished its range.") conn.close() return print(f"Thread-{thread_idx} resuming from: {current_id}") driver = None try: print(f"Thread-{thread_idx} initializing webdriver...") driver = get_driver() print(f"Thread-{thread_idx} webdriver initialized") wait = WebDriverWait(driver, 95) # 稍微增加等待时间 while current_id <= end_id: url = f"https://www.zxxk.com/soft/{current_id}.html" softname = None softdate = None createtime = None success = False max_retries = 3 for attempt in range(max_retries): start_time = time.time() try: driver.get(url) # 检查是否包含基本数据容器 或 出现错误提示文字 wait.until(lambda d: d.find_elements(By.CLASS_NAME, "document-basic-data") or "页面出错了" in d.page_source ) elements = driver.find_elements(By.CLASS_NAME, "document-basic-data") if elements: title_element = driver.find_element(By.CSS_SELECTOR, ".document-basic-data .title") softname = title_element.text.strip() time_element = driver.find_element(By.CSS_SELECTOR, ".document-basic-data .time") date_text = time_element.text date_match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text) if date_match: softdate = date_match.group(1) createtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') success = True break # 成功获取或确认页面不存在,跳出重试循环 except Exception as e: if attempt < max_retries - 1: wait_time = (attempt + 1) * 2 print(f"T{thread_idx} | ID {current_id}: Attempt {attempt+1} failed ({e}), retrying in {wait_time}s...") time.sleep(wait_time) else: print(f"T{thread_idx} | ID {current_id}: All {max_retries} attempts failed. {e}") # 只有在成功访问页面(无论是抓到数据还是确认页面出错)后才记录 # 如果是由于网络超时等原因导致的完全失败,则不存数据库,以便下次重跑 if success: # 保存到自己的数据库 cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO softinfo (softid, softname, softdate, createtime) VALUES (?, ?, ?, ?) ''', (current_id, softname, softdate, createtime)) conn.commit() elapsed = time.time() - start_time if softname or current_id % 100 == 0: print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] T{thread_idx} | Saved: {current_id} | {elapsed:.2f}s | {softname}") current_id += 1 else: # 如果连续重试都失败,可能是网络问题,建议稍微停顿或让该线程退出,避免无效重试 print(f"T{thread_idx} | ID {current_id}: Skipping save due to persistent errors.") time.sleep(5) # 这里我们依然 current_id += 1 还是 停留在原处? # 如果不加 1,会陷入死循环;如果加 1,这条数据就丢了。 # 建议记录到错误日志或直接跳过,既然已经重试了 3 次。 #current_id += 1 except Exception as e: import traceback print(f"Thread-{thread_idx} fatal error: {e}") traceback.print_exc() finally: if driver: driver.quit() conn.close() print(f"Thread-{thread_idx} finished.") def main(): threads = [] active = set(ACTIVE_THREADS) if ACTIVE_THREADS else None for i in range(THREAD_COUNT): if active is not None and i not in active: continue # 计算每个线程的范围 t_start = TOTAL_START_ID + i * STEP t_end = t_start + STEP - 1 t = threading.Thread(target=worker, args=(i, t_start, t_end)) threads.append(t) t.start() # 稍微错开启动时间,避免瞬间高并发创建 50 个浏览器实例 time.sleep(1.5) for t in threads: t.join() print("All threads completed. Goodbye!") if __name__ == "__main__": main()