import os import re import time import random from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import json # 配置 URL_LIST_FILE = '/Users/liushuming/projects/xkwsoftlist/url_list.txt' OUTPUT_DIR = '/Users/liushuming/projects/xkwsoftlist/output_html' CACHE_FILE = '/Users/liushuming/projects/xkwsoftlist/data_cache.json' def get_driver(): chrome_options = Options() chrome_options.add_argument("--headless") # 无头模式 chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--blink-settings=imagesEnabled=false") # 不加载图片 chrome_options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) return driver def load_cache(): if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: return {} return {} def save_cache(cache): with open(CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(cache, f, ensure_ascii=False, indent=2) def generate_html_files(original_ids, id_to_date): print(f"Generating HTML files for {len(original_ids)} IDs...") for oid in original_ids: filename = f"{oid}.html" filepath = os.path.join(OUTPUT_DIR, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write('\n\n\n\n') f.write(f'ID {oid} 记录\n') f.write('\n') f.write('\n\n') f.write('\n') for i in range(oid - 20, oid + 21): # 从缓存中读取,key 需要转为字符串因为 json 存储后 key 变成了字符串 date = id_to_date.get(str(i), id_to_date.get(i, "nodata")) url = f"https://www.zxxk.com/soft/{i}.html" f.write(' \n') if i == oid: f.write(f' \n') else: f.write(f' \n') f.write(f' \n') f.write(' \n') f.write('
{i}{i}{date}
\n\n') def main(): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) # 1. 读取并解析原始 ID original_ids = [] with open(URL_LIST_FILE, 'r', encoding='utf-8') as f: for line in f: url = line.strip() match = re.search(r'/soft/(\d+)\.html', url) if match: original_ids.append(int(match.group(1))) print(f"Total original URLs in file: {len(original_ids)}") # 2. 收集所有需要查询的 ID all_needed_ids = set() for oid in original_ids: for i in range(oid - 20, oid + 21): all_needed_ids.add(i) sorted_ids = sorted(list(all_needed_ids)) total_to_fetch = len(sorted_ids) # 加载已有的缓存 id_to_date = load_cache() # 过滤掉已经抓取过的 ID (如果不是 nodata 或 error 的话) ids_to_fetch = [sid for sid in sorted_ids if str(sid) not in id_to_date or id_to_date[str(sid)] in ["nodata", "error"]] print(f"Total unique IDs: {total_to_fetch}") print(f"Already cached: {len(id_to_date)}") print(f"Remaining to fetch: {len(ids_to_fetch)}") # 3. 单进程顺序抓取 driver = None try: if ids_to_fetch: print("Starting single-process fetching with random delays...") driver = get_driver() wait = WebDriverWait(driver, 10) for index, sid in enumerate(ids_to_fetch): url = f"https://www.zxxk.com/soft/{sid}.html" extracted_date = "nodata" try: driver.get(url) try: other_info = wait.until(EC.presence_of_element_located((By.CLASS_NAME, "other-info"))) time_nodes = other_info.find_elements(By.CLASS_NAME, "time") if time_nodes: date_text = time_nodes[0].text date_match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text) if date_match: extracted_date = date_match.group(1) except: pass except Exception: extracted_date = "error" id_to_date[str(sid)] = extracted_date print(f"[{index+1}/{len(ids_to_fetch)}] ID: {sid} -> {extracted_date}") # 每抓取 10 个保存一次缓存,并更新一次 HTML 文件 if (index + 1) % 10 == 0: save_cache(id_to_date) generate_html_files(original_ids, id_to_date) print(f"--- Progress saved & HTML files updated at {time.strftime('%H:%M:%S')} ---") time.sleep(random.uniform(1.0, 2.0)) # 最后抓取完保存一次 save_cache(id_to_date) generate_html_files(original_ids, id_to_date) else: print("No new IDs to fetch. Updating HTML files from cache...") generate_html_files(original_ids, id_to_date) finally: print("\nProcess finished.") if driver: driver.quit() if __name__ == "__main__": main()