import requests import pyhttpx import time from datetime import datetime, timedelta import retrying import execjs from lxml import etree import json from loguru import logger import threading from queue import Queue # import pandas as pd class GK: def __init__(self): self.search_flights_api = "https://booking.jetstar.com/hk/zh/booking/search-flights" self.headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', # 'user-agent': get_random_user_agent(), 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } with open('../akm/逆向1.js', encoding='utf-8') as f: js = f.read() self.ctx = execjs.compile(js) # self.session = pyhttpx # self.ip = 100000000000 # self.proxies = { # 'http': f'http://B_3351_HK___5_ss-{self.ip}:ev2pjj@proxy.renlaer.com:7778', # 'https': f'http://B_3351_HK___5_ss-{self.ip}:ev2pjj@proxy.renlaer.com:7778' # } self.lock = threading.Lock() # self.cookies_queue = CookieQueue() self.cookies_queue = Queue() self.ja3_queue = Queue() self.task_queue = Queue() self.resp_data_queue = Queue() @retrying.retry(stop_max_attempt_number=3, wait_fixed=4000) def get_ja3_str(self): url = "http://8.218.51.130:9003/api/v1/ja3" payload = {} headers = { 'cid': '750B5141EDBF7FA6F73A99C768130099' } while True: if self.ja3_queue.qsize() < 5: response = requests.get(url, headers=headers, data=payload) if response.status_code == 200: res_json = response.json() if res_json.get("code") == 0: ja3 = res_json.get("data").get("ja3_str") ua = res_json.get("data").get("ua") if "--" not in ja3 and ",," not in ja3: end_data = ( ua, ja3 ) self.ja3_queue.put(end_data) time.sleep(3) @retrying.retry(stop_max_attempt_number=3, wait_fixed=4000) def request_new_cookie(self): logger.debug('正在获取 cookie bm-sz...') ua, ja3 = self.ja3_queue.get() # print(ua, ja3) sess = pyhttpx.HttpSession( ja3=ja3, # 自定义 JA3 字符串 http2=True, # 启用 HTTP/2 ) # akm js file url akm_url = "https://booking.jetstar.com/MkuYlo/pcp/LD0/PPluEQ/1ik7QcJffXbmL53i/QTcvXmg7/KS5kC3N/VRQcB" data = { 'sensor_data': self.ctx.call('encrypt1') } response = sess.post(akm_url, headers=self.headers, verify=False, data=data, # proxies=self.proxies ) # print(response.text) bmsz = response.cookies['bm_sz'] data2 = { "sensor_data": self.ctx.call('encrypt2', bmsz) } data2 = json.dumps(data2) response2 = sess.post(akm_url, headers=self.headers, data=data2, verify=False) # print(response2.text) # print(response2.status_code) # print(response2.cookies.get_dict()) # with self.lock: logger.debug(f'成功获取到 cookie :{bmsz}') return response.cookies # return bmsz def _refresh_cookie(self): while True: if self.cookies_queue.qsize() < 2: cookie = self.request_new_cookie() self.cookies_queue.put(cookie) time.sleep(3) def gen_task(self, start_date, end_date): """将每个城市对与日期组合生成独立任务, 上传到任务队列""" # 获取采集城市对 # for city_code in self.gen_city(): # 获取采集时间 for datetime_str in self.gen_datetime(start_date, end_date): # self.task_queue.put((city_code, datetime_str)) self.task_queue.put((datetime_str)) @retrying.retry(stop_max_attempt_number=2) def send_get(self, url, params, bmsz_cookie): ua, ja3_str = self.ja3_queue.get() sess = pyhttpx.HttpSession( ja3=ja3_str, # 自定义 JA3 字符串 http2=True, # 启用 HTTP/2 ) headers = { # 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', 'user-agent': ua, 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } response = sess.get( url, headers=headers, params=params, cookies=bmsz_cookie, timeout=15, verify=False, ) self.ja3_queue.put((ua, ja3_str)) # 回收破烂 # print(response.text) # logger.info(f'') return response def get_data(self): while True: datetime_str = self.task_queue.get() bmsz_cookie = self.cookies_queue.get() params = { "s": "true", "adults": "1", # 成年人 "children": "0", # 儿童 "infants": "0", # 婴儿 "selectedclass1": "economy", # 选择类型:经济舱 "currency": "CNY", # 货币 "mon": "true", "channel": "DESKTOP", "origin1": "PVG", # 出发地 "destination1": "NRT", # 目的地 "departuredate1": datetime_str # 出发时间 } logger.info(f'正在采集 {datetime_str} 航班数据...') try: response = self.send_get(self.search_flights_api, params, bmsz_cookie) self.resp_data_queue.put((datetime_str, response)) # 请求成功,归还Cookie self.cookies_queue.put(bmsz_cookie) # 成功时放回cookie except Exception as e: logger.error(f"错误发生: {e}") self.task_queue.put(datetime_str) finally: self.task_queue.task_done() def parse_data(self): while True: datetime_str, response = self.resp_data_queue.get() html = etree.HTML(response.text) data = html.xpath("//script[@id='bundle-data-v2']/text()") if data: json_data = json.loads(data[0]) logger.info(f'获取数据成功 {datetime_str} => {json_data}') else: logger.warning(f'{datetime_str} 当天暂无数据 / 触发验证码') print(response.text) self.resp_data_queue.task_done() def gen_city(self): """提取Excel表格的城市对信息, 用set去重""" # 只读取Excel的Sheet1的航段信息, 将读取的数据存储在 df(DataFrame 对象)中。 df = pd.read_excel( self.excel_path, sheet_name="Sheet1", usecols=["出发机场", "到达机场"] # 只读取 "出发机场" 和 "到达机场" 两列。 ) segment_info = set() segment_info.add('YNT,XIY') # 遍历 DataFrame 的每一行 # for row in df.itertuples(index=True, name='Pandas'): # # 访问行中的数据 # segment_info.add(row.出发机场 + ',' + row.到达机场) # logger.info(f'去重后的航段长度: {len(segment_info)}, {segment_info}') return segment_info @staticmethod def gen_datetime(start_date, end_date): current_date = datetime.strptime(start_date, '%Y-%m-%d') end_date = datetime.strptime(end_date, '%Y-%m-%d') date_list = [] while current_date <= end_date: date_list.append(current_date.strftime('%Y-%m-%d')) # 转换为字符串格式存储 current_date += timedelta(days=1) return date_list def run(self, start_date, end_date): thread_list = [] self.gen_task(start_date, end_date) for _ in range(2): t_get_cookie = threading.Thread(target=self._refresh_cookie) thread_list.append(t_get_cookie) t_get_ja3 = threading.Thread(target=self.get_ja3_str) thread_list.append(t_get_ja3) for _ in range(6): t_get_data = threading.Thread(target=self.get_data) thread_list.append(t_get_data) t_parse_data = threading.Thread(target=self.parse_data) thread_list.append(t_parse_data) for t_obj in thread_list: t_obj.setDaemon(True) t_obj.start() for q in [self.task_queue, self.resp_data_queue]: q.join() if __name__ == '__main__': gk = GK() gk.run(start_date='2025-06-01', end_date='2025-06-30')