238 lines
8.6 KiB
Python
Executable File
238 lines
8.6 KiB
Python
Executable File
"""
|
||
fill_missing.py
|
||
分段检查每个 softlist_*.db 中是否存在缺失 ID,若有则重新抓取并写入。
|
||
支持断点续传:进度保存在各 db 文件的 repair_progress 表中。
|
||
使用与 scrape_to_sqlite.py 相同的配置文件 config.py。
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import datetime
|
||
import threading
|
||
import time
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
||
from scrape_to_sqlite import get_driver
|
||
from config import TOTAL_START_ID, THREAD_COUNT, STEP, ACTIVE_THREADS
|
||
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 每次扫描的 ID 区间大小(不是一次读取条数,而是按 softid 区间切片)
|
||
CHUNK_SIZE = 10
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 进度表操作
|
||
# ──────────────────────────────────────────────
|
||
|
||
def init_progress_table(conn):
|
||
"""在 db 中创建断点续传进度表(若不存在)。"""
|
||
conn.execute('''
|
||
CREATE TABLE IF NOT EXISTS repair_progress (
|
||
key TEXT PRIMARY KEY,
|
||
value INTEGER
|
||
)
|
||
''')
|
||
conn.commit()
|
||
|
||
|
||
def get_progress(conn, start_id):
|
||
"""读取上次扫描到的位置,默认从 start_id 开始。"""
|
||
row = conn.execute(
|
||
"SELECT value FROM repair_progress WHERE key='scan_pos'"
|
||
).fetchone()
|
||
return row[0] if row else start_id
|
||
|
||
|
||
def save_progress(conn, pos):
|
||
"""保存当前扫描位置。"""
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO repair_progress (key, value) VALUES ('scan_pos', ?)",
|
||
(pos,)
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def clear_progress(conn):
|
||
"""扫描全部完成后清除进度,方便下次全量重新检查。"""
|
||
conn.execute("DELETE FROM repair_progress WHERE key='scan_pos'")
|
||
conn.commit()
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 分段缺失检测(只查一小段)
|
||
# ──────────────────────────────────────────────
|
||
|
||
def find_missing_in_chunk(conn, chunk_start, chunk_end):
|
||
"""
|
||
在 [chunk_start, chunk_end] 范围内,找出数据库中缺失的 softid。
|
||
仅查询这一小段,不读取整张表。
|
||
"""
|
||
rows = conn.execute(
|
||
'SELECT softid FROM softinfo WHERE softid >= ? AND softid <= ? ORDER BY softid',
|
||
(chunk_start, chunk_end)
|
||
).fetchall()
|
||
existing = {row[0] for row in rows}
|
||
return [i for i in range(chunk_start, chunk_end + 1) if i not in existing]
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 页面抓取
|
||
# ──────────────────────────────────────────────
|
||
|
||
def fetch_one(driver, wait, softid):
|
||
"""
|
||
抓取单条数据,返回 (softname, softdate, createtime)。
|
||
页面不存在时 softname/softdate 为 None,createtime 仍有值。
|
||
失败时抛出异常。
|
||
"""
|
||
url = f"https://www.zxxk.com/soft/{softid}.html"
|
||
driver.get(url)
|
||
wait.until(lambda d:
|
||
d.find_elements(By.CLASS_NAME, "document-basic-data") or
|
||
"页面出错了" in d.page_source
|
||
)
|
||
|
||
softname = softdate = None
|
||
elements = driver.find_elements(By.CLASS_NAME, "document-basic-data")
|
||
if elements:
|
||
softname = driver.find_element(
|
||
By.CSS_SELECTOR, ".document-basic-data .title"
|
||
).text.strip()
|
||
|
||
date_text = driver.find_element(
|
||
By.CSS_SELECTOR, ".document-basic-data .time"
|
||
).text
|
||
m = re.search(r'(\d{4}-\d{2}-\d{2})', date_text)
|
||
if m:
|
||
softdate = m.group(1)
|
||
|
||
createtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
return softname, softdate, createtime
|
||
|
||
|
||
def repair_id(conn, driver, wait, thread_idx, softid):
|
||
"""
|
||
补抓单个 softid,重试 3 次。
|
||
成功后写入数据库并返回。
|
||
3 次均失败则抛出异常,由调用方决定是否退出线程(进度不会推进)。
|
||
"""
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
start_time = time.time()
|
||
softname, softdate, createtime = fetch_one(driver, wait, softid)
|
||
conn.execute(
|
||
'INSERT OR REPLACE INTO softinfo (softid, softname, softdate, createtime) VALUES (?, ?, ?, ?)',
|
||
(softid, softname, softdate, createtime)
|
||
)
|
||
conn.commit()
|
||
elapsed = time.time() - start_time
|
||
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] T{thread_idx} | Repaired: {softid} | {elapsed:.2f}s | {softname}")
|
||
return
|
||
except Exception as e:
|
||
wait_sec = (attempt + 1) * 2
|
||
if attempt < max_retries - 1:
|
||
print(f"[T{thread_idx}] ID {softid} 第{attempt+1}次失败({e}),{wait_sec}s 后重试...")
|
||
time.sleep(wait_sec)
|
||
else:
|
||
print(f"[T{thread_idx}] ID {softid} 重试 {max_retries} 次均失败,退出线程(进度未推进,重启后可续传)。")
|
||
raise # 向上抛出,触发 repair_worker 的 except 分支
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 线程主逻辑
|
||
# ──────────────────────────────────────────────
|
||
|
||
def repair_worker(thread_idx, start_id, end_id):
|
||
db_name = f'softlist_{thread_idx}.db'
|
||
db_path = os.path.join(BASE_DIR, db_name)
|
||
|
||
if not os.path.exists(db_path):
|
||
print(f"[T{thread_idx}] DB 文件不存在,跳过: {db_name}")
|
||
return
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
init_progress_table(conn)
|
||
|
||
# 确定本次扫描的实际上界:数据库中已爬到的最大 ID
|
||
row = conn.execute(
|
||
'SELECT MAX(softid) FROM softinfo WHERE softid >= ? AND softid <= ?',
|
||
(start_id, end_id)
|
||
).fetchone()
|
||
if not row or row[0] is None:
|
||
print(f"[T{thread_idx}] 数据库为空,跳过。")
|
||
conn.close()
|
||
return
|
||
actual_end = row[0]
|
||
|
||
# 断点续传:从上次保存的位置继续
|
||
scan_pos = get_progress(conn, start_id)
|
||
if scan_pos > actual_end:
|
||
print(f"[T{thread_idx}] 已全部扫描完毕(上次进度 {scan_pos} > 当前最大 {actual_end}),若需重新检查请清除进度。")
|
||
conn.close()
|
||
return
|
||
|
||
print(f"[T{thread_idx}] 开始分段扫描,从 {scan_pos} 到 {actual_end},分块大小 {CHUNK_SIZE}...")
|
||
|
||
driver = None
|
||
total_repaired = 0
|
||
|
||
try:
|
||
driver = get_driver()
|
||
wait = WebDriverWait(driver, 95)
|
||
|
||
chunk_start = scan_pos
|
||
while chunk_start <= actual_end:
|
||
chunk_end = min(chunk_start + CHUNK_SIZE - 1, actual_end)
|
||
|
||
missing = find_missing_in_chunk(conn, chunk_start, chunk_end)
|
||
if missing:
|
||
print(f"[T{thread_idx}] [{chunk_start}, {chunk_end}] 缺失 {len(missing)} 个: {missing}")
|
||
for mid in missing:
|
||
repair_id(conn, driver, wait, thread_idx, mid) # 成功才返回
|
||
total_repaired += 1
|
||
|
||
# 该分块全部补完才推进进度
|
||
chunk_start = chunk_end + 1
|
||
save_progress(conn, chunk_start)
|
||
|
||
# 扫描全部完成,清除进度记录
|
||
clear_progress(conn)
|
||
print(f"[T{thread_idx}] 扫描完成。共补充 {total_repaired} 条。")
|
||
|
||
except Exception as e:
|
||
print(f"[T{thread_idx}] 致命错误: {e}(进度已保存,下次可续传)")
|
||
finally:
|
||
if driver:
|
||
driver.quit()
|
||
conn.close()
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 入口
|
||
# ──────────────────────────────────────────────
|
||
|
||
def main():
|
||
active = set(ACTIVE_THREADS) if ACTIVE_THREADS else set(range(THREAD_COUNT))
|
||
|
||
threads = []
|
||
for i in sorted(active):
|
||
t_start = TOTAL_START_ID + i * STEP
|
||
t_end = t_start + STEP - 1
|
||
t = threading.Thread(target=repair_worker, args=(i, t_start, t_end))
|
||
threads.append(t)
|
||
t.start()
|
||
time.sleep(1.0)
|
||
|
||
for t in threads:
|
||
t.join()
|
||
|
||
print("所有线程检查/修补完毕。")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|