""" fill_missing.py 分段检查每个 softlist_*.db 中是否存在缺失 ID,若有则重新抓取并写入。 支持断点续传:进度保存在各 db 文件的 repair_progress 表中。 使用与 scrape_to_sqlite.py 相同的配置文件 config.py。 """ import os import re import sqlite3 import datetime import threading import time from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from scrape_to_sqlite import get_driver from config import TOTAL_START_ID, THREAD_COUNT, STEP, ACTIVE_THREADS BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 每次扫描的 ID 区间大小(不是一次读取条数,而是按 softid 区间切片) CHUNK_SIZE = 10 # ────────────────────────────────────────────── # 进度表操作 # ────────────────────────────────────────────── def init_progress_table(conn): """在 db 中创建断点续传进度表(若不存在)。""" conn.execute(''' CREATE TABLE IF NOT EXISTS repair_progress ( key TEXT PRIMARY KEY, value INTEGER ) ''') conn.commit() def get_progress(conn, start_id): """读取上次扫描到的位置,默认从 start_id 开始。""" row = conn.execute( "SELECT value FROM repair_progress WHERE key='scan_pos'" ).fetchone() return row[0] if row else start_id def save_progress(conn, pos): """保存当前扫描位置。""" conn.execute( "INSERT OR REPLACE INTO repair_progress (key, value) VALUES ('scan_pos', ?)", (pos,) ) conn.commit() def clear_progress(conn): """扫描全部完成后清除进度,方便下次全量重新检查。""" conn.execute("DELETE FROM repair_progress WHERE key='scan_pos'") conn.commit() # ────────────────────────────────────────────── # 分段缺失检测(只查一小段) # ────────────────────────────────────────────── def find_missing_in_chunk(conn, chunk_start, chunk_end): """ 在 [chunk_start, chunk_end] 范围内,找出数据库中缺失的 softid。 仅查询这一小段,不读取整张表。 """ rows = conn.execute( 'SELECT softid FROM softinfo WHERE softid >= ? AND softid <= ? ORDER BY softid', (chunk_start, chunk_end) ).fetchall() existing = {row[0] for row in rows} return [i for i in range(chunk_start, chunk_end + 1) if i not in existing] # ────────────────────────────────────────────── # 页面抓取 # ────────────────────────────────────────────── def fetch_one(driver, wait, softid): """ 抓取单条数据,返回 (softname, softdate, createtime)。 页面不存在时 softname/softdate 为 None,createtime 仍有值。 失败时抛出异常。 """ url = f"https://www.zxxk.com/soft/{softid}.html" driver.get(url) wait.until(lambda d: d.find_elements(By.CLASS_NAME, "document-basic-data") or "页面出错了" in d.page_source ) softname = softdate = None elements = driver.find_elements(By.CLASS_NAME, "document-basic-data") if elements: softname = driver.find_element( By.CSS_SELECTOR, ".document-basic-data .title" ).text.strip() date_text = driver.find_element( By.CSS_SELECTOR, ".document-basic-data .time" ).text m = re.search(r'(\d{4}-\d{2}-\d{2})', date_text) if m: softdate = m.group(1) createtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') return softname, softdate, createtime def repair_id(conn, driver, wait, thread_idx, softid): """ 补抓单个 softid,重试 3 次。 成功后写入数据库并返回。 3 次均失败则抛出异常,由调用方决定是否退出线程(进度不会推进)。 """ max_retries = 3 for attempt in range(max_retries): try: start_time = time.time() softname, softdate, createtime = fetch_one(driver, wait, softid) conn.execute( 'INSERT OR REPLACE INTO softinfo (softid, softname, softdate, createtime) VALUES (?, ?, ?, ?)', (softid, softname, softdate, createtime) ) conn.commit() elapsed = time.time() - start_time print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] T{thread_idx} | Repaired: {softid} | {elapsed:.2f}s | {softname}") return except Exception as e: wait_sec = (attempt + 1) * 2 if attempt < max_retries - 1: print(f"[T{thread_idx}] ID {softid} 第{attempt+1}次失败({e}),{wait_sec}s 后重试...") time.sleep(wait_sec) else: print(f"[T{thread_idx}] ID {softid} 重试 {max_retries} 次均失败,退出线程(进度未推进,重启后可续传)。") raise # 向上抛出,触发 repair_worker 的 except 分支 # ────────────────────────────────────────────── # 线程主逻辑 # ────────────────────────────────────────────── def repair_worker(thread_idx, start_id, end_id): db_name = f'softlist_{thread_idx}.db' db_path = os.path.join(BASE_DIR, db_name) if not os.path.exists(db_path): print(f"[T{thread_idx}] DB 文件不存在,跳过: {db_name}") return conn = sqlite3.connect(db_path) init_progress_table(conn) # 确定本次扫描的实际上界:数据库中已爬到的最大 ID row = conn.execute( 'SELECT MAX(softid) FROM softinfo WHERE softid >= ? AND softid <= ?', (start_id, end_id) ).fetchone() if not row or row[0] is None: print(f"[T{thread_idx}] 数据库为空,跳过。") conn.close() return actual_end = row[0] # 断点续传:从上次保存的位置继续 scan_pos = get_progress(conn, start_id) if scan_pos > actual_end: print(f"[T{thread_idx}] 已全部扫描完毕(上次进度 {scan_pos} > 当前最大 {actual_end}),若需重新检查请清除进度。") conn.close() return print(f"[T{thread_idx}] 开始分段扫描,从 {scan_pos} 到 {actual_end},分块大小 {CHUNK_SIZE}...") driver = None total_repaired = 0 try: driver = get_driver() wait = WebDriverWait(driver, 95) chunk_start = scan_pos while chunk_start <= actual_end: chunk_end = min(chunk_start + CHUNK_SIZE - 1, actual_end) missing = find_missing_in_chunk(conn, chunk_start, chunk_end) if missing: print(f"[T{thread_idx}] [{chunk_start}, {chunk_end}] 缺失 {len(missing)} 个: {missing}") for mid in missing: repair_id(conn, driver, wait, thread_idx, mid) # 成功才返回 total_repaired += 1 # 该分块全部补完才推进进度 chunk_start = chunk_end + 1 save_progress(conn, chunk_start) # 扫描全部完成,清除进度记录 clear_progress(conn) print(f"[T{thread_idx}] 扫描完成。共补充 {total_repaired} 条。") except Exception as e: print(f"[T{thread_idx}] 致命错误: {e}(进度已保存,下次可续传)") finally: if driver: driver.quit() conn.close() # ────────────────────────────────────────────── # 入口 # ────────────────────────────────────────────── def main(): active = set(ACTIVE_THREADS) if ACTIVE_THREADS else set(range(THREAD_COUNT)) threads = [] for i in sorted(active): t_start = TOTAL_START_ID + i * STEP t_end = t_start + STEP - 1 t = threading.Thread(target=repair_worker, args=(i, t_start, t_end)) threads.append(t) t.start() time.sleep(1.0) for t in threads: t.join() print("所有线程检查/修补完毕。") if __name__ == "__main__": main()