爬取 pid 的脚本
import argparse
import requests
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import sys
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
})
def is_valid_url(url):
try:
parts = urlparse(url)
return parts.scheme in ('http', 'https') and parts.netloc != ''
except Exception:
return False
def extract_last_segment(location):
parsed = urlparse(location)
if parsed.query:
qs = parse_qs(parsed.query)
if 'rp' in qs:
path = qs['rp'][0]
else:
path = parsed.path
else:
path = parsed.path
if path.endswith('/'):
path = path[:-1]
return path.split('/')[-1]
def get_http_version(resp):
# requests.raw.version 返回的是数字,比如 10 (HTTP/1.0), 11 (HTTP/1.1), 20 (HTTP/2)
version_map = {10: "1.0", 11: "1.1", 20: "2"}
try:
return version_map.get(resp.raw.version, str(resp.raw.version))
except Exception:
return "unknown"
def main():
parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果")
parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数')
parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid')
parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串')
parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php')
parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径')
args = parser.parse_args()
if args.start > args.end:
print("错误:起始 pid 不应大于结束 pid。")
sys.exit(1)
if not is_valid_url(args.url):
print("错误:无效的 URL,请传入合法的 http 或 https URL。")
sys.exit(1)
filters = [x.strip() for x in args.filter.split(',') if x.strip()]
output_file = "result.md"
with open(output_file, "w", encoding="utf-8") as f:
for pid in range(args.end, args.start - 1, -1):
base_parsed = urlparse(args.url)
query_dict = parse_qs(base_parsed.query)
query_dict['pid'] = [str(pid)]
# query_dict['aff'] = [args.aff]
new_query = urlencode(query_dict, doseq=True)
request_url = urlunparse((
base_parsed.scheme,
base_parsed.netloc,
base_parsed.path,
base_parsed.params,
new_query,
base_parsed.fragment
))
print(f"\n===== PID={pid} =====")
print(f"-- 请求 URL: {request_url}")
try:
resp1 = session.get(request_url, allow_redirects=False, timeout=10)
if resp1.status_code == 403:
raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden)")
ver1 = get_http_version(resp1)
print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}")
for header, value in resp1.headers.items():
print(f"{header}: {value}")
if 'Location' not in resp1.headers:
print(f"PID={pid} 无跳转")
continue
first_location = resp1.headers['Location']
second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location)
resp2 = session.get(second_url, allow_redirects=False, timeout=10)
ver2 = get_http_version(resp2)
print(f"\n-- 请求 URL: {second_url}")
print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}")
for header, value in resp2.headers.items():
print(f"{header}: {value}")
if 'Location' in resp2.headers:
second_location = resp2.headers['Location']
last_segment = extract_last_segment(second_location)
if any(filt in last_segment for filt in filters):
print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}")
continue
md_link = f"[{last_segment}]({request_url}&aff={args.aff})"
f.write(md_link + "\n")
else:
print(f"PID={pid} 无第二次跳转")
except requests.RequestException as e:
print(f"请求失败,PID={pid} 错误信息: {e}")
sys.exit(1)
print(f"\n结果已保存到 {output_file}")
if __name__ == "__main__":
main()