import os
import re
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import json
# 配置
URL_LIST_FILE = '/Users/liushuming/projects/xkwsoftlist/url_list.txt'
OUTPUT_DIR = '/Users/liushuming/projects/xkwsoftlist/output_html'
CACHE_FILE = '/Users/liushuming/projects/xkwsoftlist/data_cache.json'
def get_driver():
chrome_options = Options()
chrome_options.add_argument("--headless") # 无头模式
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--blink-settings=imagesEnabled=false") # 不加载图片
chrome_options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
def load_cache():
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def save_cache(cache):
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
def generate_html_files(original_ids, id_to_date):
print(f"Generating HTML files for {len(original_ids)} IDs...")
for oid in original_ids:
filename = f"{oid}.html"
filepath = os.path.join(OUTPUT_DIR, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write('\n\n
\n\n')
f.write(f'ID {oid} 记录\n')
f.write('\n')
f.write('\n\n')
f.write('\n')
for i in range(oid - 20, oid + 21):
# 从缓存中读取,key 需要转为字符串因为 json 存储后 key 变成了字符串
date = id_to_date.get(str(i), id_to_date.get(i, "nodata"))
url = f"https://www.zxxk.com/soft/{i}.html"
f.write(' \n')
if i == oid:
f.write(f' | {i} | \n')
else:
f.write(f' {i} | \n')
f.write(f' {date} | \n')
f.write('
\n')
f.write('
\n\n')
def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# 1. 读取并解析原始 ID
original_ids = []
with open(URL_LIST_FILE, 'r', encoding='utf-8') as f:
for line in f:
url = line.strip()
match = re.search(r'/soft/(\d+)\.html', url)
if match:
original_ids.append(int(match.group(1)))
print(f"Total original URLs in file: {len(original_ids)}")
# 2. 收集所有需要查询的 ID
all_needed_ids = set()
for oid in original_ids:
for i in range(oid - 20, oid + 21):
all_needed_ids.add(i)
sorted_ids = sorted(list(all_needed_ids))
total_to_fetch = len(sorted_ids)
# 加载已有的缓存
id_to_date = load_cache()
# 过滤掉已经抓取过的 ID (如果不是 nodata 或 error 的话)
ids_to_fetch = [sid for sid in sorted_ids if str(sid) not in id_to_date or id_to_date[str(sid)] in ["nodata", "error"]]
print(f"Total unique IDs: {total_to_fetch}")
print(f"Already cached: {len(id_to_date)}")
print(f"Remaining to fetch: {len(ids_to_fetch)}")
# 3. 单进程顺序抓取
driver = None
try:
if ids_to_fetch:
print("Starting single-process fetching with random delays...")
driver = get_driver()
wait = WebDriverWait(driver, 10)
for index, sid in enumerate(ids_to_fetch):
url = f"https://www.zxxk.com/soft/{sid}.html"
extracted_date = "nodata"
try:
driver.get(url)
try:
other_info = wait.until(EC.presence_of_element_located((By.CLASS_NAME, "other-info")))
time_nodes = other_info.find_elements(By.CLASS_NAME, "time")
if time_nodes:
date_text = time_nodes[0].text
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', date_text)
if date_match:
extracted_date = date_match.group(1)
except:
pass
except Exception:
extracted_date = "error"
id_to_date[str(sid)] = extracted_date
print(f"[{index+1}/{len(ids_to_fetch)}] ID: {sid} -> {extracted_date}")
# 每抓取 10 个保存一次缓存,并更新一次 HTML 文件
if (index + 1) % 10 == 0:
save_cache(id_to_date)
generate_html_files(original_ids, id_to_date)
print(f"--- Progress saved & HTML files updated at {time.strftime('%H:%M:%S')} ---")
time.sleep(random.uniform(1.0, 2.0))
# 最后抓取完保存一次
save_cache(id_to_date)
generate_html_files(original_ids, id_to_date)
else:
print("No new IDs to fetch. Updating HTML files from cache...")
generate_html_files(original_ids, id_to_date)
finally:
print("\nProcess finished.")
if driver:
driver.quit()
if __name__ == "__main__":
main()