Files
xkwsoftlistnew/scrape_to_sqlite.py
liushuming 4c6599df76 aaa
2026-02-27 18:23:38 +08:00

203 lines
8.1 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import sys
import time
import random
import sqlite3
import datetime
import shutil
import threading
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from config import TOTAL_START_ID, THREAD_COUNT, STEP, ACTIVE_THREADS
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def get_driver():
chrome_options = Options()
chrome_options.page_load_strategy = 'eager'
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# 禁用图片、CSS和字体以加快加载速度
prefs = {
"profile.managed_default_content_settings.images": 2,
"profile.managed_default_content_settings.stylesheets": 2,
"profile.managed_default_content_settings.fonts": 2
}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
# 优先使用本地 chromedriver环境变量 CHROMEDRIVER_PATH 或 PATH 中的 chromedriver
chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') or shutil.which('chromedriver')
if chromedriver_path:
try:
print(f"Using local chromedriver: {chromedriver_path}")
service = Service(chromedriver_path)
except Exception as e:
print(f"Local chromedriver at {chromedriver_path} failed to start: {e}\nFalling back to webdriver_manager...")
service = Service(ChromeDriverManager().install())
else:
print("Local chromedriver not found; downloading via webdriver_manager...")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
def init_thread_db(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS softinfo (
softid INTEGER PRIMARY KEY,
softname TEXT,
softdate TEXT,
createtime TEXT
)
''')
conn.commit()
return conn
def get_last_processed_id(conn, start_id, end_id):
cursor = conn.cursor()
# 查找该线程负责范围内的最大 ID
cursor.execute('SELECT MAX(softid) FROM softinfo WHERE softid >= ? AND softid <= ?', (start_id, end_id))
result = cursor.fetchone()
if result and result[0] is not None:
return result[0]
return start_id - 1
def worker(thread_idx, start_id, end_id):
db_name = f'softlist_{thread_idx}.db'
db_path = os.path.join(BASE_DIR, db_name)
print(f"Thread-{thread_idx} started: range [{start_id}, {end_id}], DB: {db_name}")
# 初始化自己的数据库
conn = init_thread_db(db_path)
current_id = get_last_processed_id(conn, start_id, end_id) + 1
if current_id > end_id:
print(f"Thread-{thread_idx} has already finished its range.")
conn.close()
return
print(f"Thread-{thread_idx} resuming from: {current_id}")
driver = None
try:
print(f"Thread-{thread_idx} initializing webdriver...")
driver = get_driver()
print(f"Thread-{thread_idx} webdriver initialized")
wait = WebDriverWait(driver, 95) # 稍微增加等待时间
while current_id <= end_id:
url = f"https://www.zxxk.com/soft/{current_id}.html"
softname = None
softdate = None
createtime = None
success = False
max_retries = 3
for attempt in range(max_retries):
start_time = time.time()
try:
driver.get(url)
# 检查是否包含基本数据容器 或 出现错误提示文字
wait.until(lambda d:
d.find_elements(By.CLASS_NAME, "document-basic-data") or
"页面出错了" in d.page_source
)
elements = driver.find_elements(By.CLASS_NAME, "document-basic-data")
if elements:
title_element = driver.find_element(By.CSS_SELECTOR, ".document-basic-data .title")
softname = title_element.text.strip()
time_element = driver.find_element(By.CSS_SELECTOR, ".document-basic-data .time")
date_text = time_element.text
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text)
if date_match:
softdate = date_match.group(1)
createtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
success = True
break # 成功获取或确认页面不存在,跳出重试循环
except Exception as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
print(f"T{thread_idx} | ID {current_id}: Attempt {attempt+1} failed ({e}), retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"T{thread_idx} | ID {current_id}: All {max_retries} attempts failed. {e}")
# 只有在成功访问页面(无论是抓到数据还是确认页面出错)后才记录
# 如果是由于网络超时等原因导致的完全失败,则不存数据库,以便下次重跑
if success:
# 保存到自己的数据库
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO softinfo (softid, softname, softdate, createtime)
VALUES (?, ?, ?, ?)
''', (current_id, softname, softdate, createtime))
conn.commit()
elapsed = time.time() - start_time
if softname or current_id % 100 == 0:
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] T{thread_idx} | Saved: {current_id} | {elapsed:.2f}s | {softname}")
current_id += 1
else:
# 如果连续重试都失败,可能是网络问题,建议稍微停顿或让该线程退出,避免无效重试
print(f"T{thread_idx} | ID {current_id}: Skipping save due to persistent errors.")
time.sleep(5)
# 这里我们依然 current_id += 1 还是 停留在原处?
# 如果不加 1会陷入死循环如果加 1这条数据就丢了。
# 建议记录到错误日志或直接跳过,既然已经重试了 3 次。
#current_id += 1
except Exception as e:
import traceback
print(f"Thread-{thread_idx} fatal error: {e}")
traceback.print_exc()
finally:
if driver:
driver.quit()
conn.close()
print(f"Thread-{thread_idx} finished.")
def main():
threads = []
active = set(ACTIVE_THREADS) if ACTIVE_THREADS else None
for i in range(THREAD_COUNT):
if active is not None and i not in active:
continue
# 计算每个线程的范围
t_start = TOTAL_START_ID + i * STEP
t_end = t_start + STEP - 1
t = threading.Thread(target=worker, args=(i, t_start, t_end))
threads.append(t)
t.start()
# 稍微错开启动时间,避免瞬间高并发创建 50 个浏览器实例
time.sleep(1.5)
for t in threads:
t.join()
print("All threads completed. Goodbye!")
if __name__ == "__main__":
main()