爬取 pid 的脚本
import argparse
import requests
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import sys
session = requests.Session()
session.headers.update({
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
})
def is_valid_url(url):
    try:
        parts = urlparse(url)
        return parts.scheme in ('http', 'https') and parts.netloc != ''
    except Exception:
        return False
def extract_last_segment(location):
    parsed = urlparse(location)
    if parsed.query:
        qs = parse_qs(parsed.query)
        if 'rp' in qs:
            path = qs['rp'][0]
        else:
            path = parsed.path
    else:
        path = parsed.path
    if path.endswith('/'):
        path = path[:-1]
    return path.split('/')[-1]
def get_http_version(resp):
    # requests.raw.version 返回的是数字,比如 10 (HTTP/1.0), 11 (HTTP/1.1), 20 (HTTP/2)
    version_map = {10: "1.0", 11: "1.1", 20: "2"}
    try:
        return version_map.get(resp.raw.version, str(resp.raw.version))
    except Exception:
        return "unknown"
def main():
    parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果")
    parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数')
    parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid')
    parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串')
    parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php')
    parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径')
    args = parser.parse_args()
    if args.start > args.end:
        print("错误:起始 pid 不应大于结束 pid。")
        sys.exit(1)
    if not is_valid_url(args.url):
        print("错误:无效的 URL,请传入合法的 http 或 https URL。")
        sys.exit(1)
    filters = [x.strip() for x in args.filter.split(',') if x.strip()]
    output_file = "result.md"
    with open(output_file, "w", encoding="utf-8") as f:
        for pid in range(args.end, args.start - 1, -1):
            base_parsed = urlparse(args.url)
            query_dict = parse_qs(base_parsed.query)
            query_dict['pid'] = [str(pid)]
            # query_dict['aff'] = [args.aff]
            new_query = urlencode(query_dict, doseq=True)
            request_url = urlunparse((
                base_parsed.scheme,
                base_parsed.netloc,
                base_parsed.path,
                base_parsed.params,
                new_query,
                base_parsed.fragment
            ))
            print(f"\n===== PID={pid} =====")
            print(f"-- 请求 URL: {request_url}")
            try:
                resp1 = session.get(request_url, allow_redirects=False, timeout=10)
                if resp1.status_code == 403:
                    raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden)")
                ver1 = get_http_version(resp1)
                print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}")
                for header, value in resp1.headers.items():
                    print(f"{header}: {value}")
                if 'Location' not in resp1.headers:
                    print(f"PID={pid} 无跳转")
                    continue
                first_location = resp1.headers['Location']
                second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location)
                resp2 = session.get(second_url, allow_redirects=False, timeout=10)
                ver2 = get_http_version(resp2)
                print(f"\n-- 请求 URL: {second_url}")
                print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}")
                for header, value in resp2.headers.items():
                    print(f"{header}: {value}")
                if 'Location' in resp2.headers:
                    second_location = resp2.headers['Location']
                    last_segment = extract_last_segment(second_location)
                    if any(filt in last_segment for filt in filters):
                        print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}")
                        continue
                    md_link = f"[{last_segment}]({request_url}&aff={args.aff})"
                    f.write(md_link + "\n")
                else:
                    print(f"PID={pid} 无第二次跳转")
            except requests.RequestException as e:
                print(f"请求失败,PID={pid} 错误信息: {e}")
                sys.exit(1)
    print(f"\n结果已保存到 {output_file}")
if __name__ == "__main__":
    main()