import threading import queue import time import json import re import execjs import random import retrying from lxml import etree import datetime import requests import tls_client from urllib.parse import urljoin from bson.objectid import ObjectId from flights_utils import get_now_time, put_data_by_queue, get_data_by_queue, \ time_format_conversion from my_logger import Logger from flights_mongodb import mongo_con_parse from settings import CRAWL_DATE_CONF_STATUS_TAB, FLIGHTS_WEBSITES_ROUTE_CONF_TAB, PROXY_TAIL def generate_date_range(days): """ 生成 n 天的日期范围 """ # 获取今天的日期(结束日期) start_date = datetime.datetime.today() # 计算开始日期:结束日期 - (days-1) 天(确保包含今天) end_date = start_date + datetime.timedelta(days=days) # 格式化输出 return ( start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d") ) class GKSpider(): def __init__(self): self.website = 'gk' # 网站 self.is_proxy = True self.is_online = True # if self.is_proxy: if self.is_online: # proxies = { # 'http': f'http://B_3351_HK___5_ss-{ip}:ev2pjj@proxy.renlaer.com:7778', # 'https': f'http://B_3351_HK___5_ss-{ip}:ev2pjj@proxy.renlaer.com:7778' # } self.proxy_meta = f"http://B_3351_HK___5_ss-xxxxxxxxxxxx:{PROXY_TAIL}" # AU / HK self.time_sleep = 0.5 else: self.proxy_meta = "http://127.0.0.1:7897" # self.time_sleep = 5.5 self.time_sleep = 0.5 self.proxies = { "http": self.proxy_meta, "https": self.proxy_meta, } else: self.proxies = None self.time_sleep = 5.5 self.search_flights_api = "https://booking.jetstar.com/hk/zh/booking/search-flights" with open('./js_files/akm逆向5.26.js', encoding='utf-8') as f: js = f.read() self.ctx = execjs.compile(js) self.task_queue = queue.Queue() self.cookies_queue = queue.Queue() self.ja3_queue = queue.Queue() # 要回收可用ja3 频繁切换ja3会增加触发验证码几率 self.headers = { "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "culture": "zh-HK", "origin": "https://www.jetstar.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://www.jetstar.com/", "sec-ch-ua": "\"Google Chrome\";v=\"135\", \"Not-A.Brand\";v=\"8\", \"Chromium\";v=\"135\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36" } if self.is_online: self.cookie_thread_num = 3 self.thread_numbers = 8 self.crawl_days = 10 else: self.cookie_thread_num = 3 self.thread_numbers = 8 self.crawl_days = 5 self.cookie_queue_num = 4 # 队列中 cookie 的数量 self.ja3_queue_num = 4 # 队列中的 ja3 数量 self.retry_number = 2 # 5 self.db = mongo_con_parse() self.info_tab = "flights_{}_info_tab".format(self.website) self.logger = Logger('./logs/{}_flights_spiders.log'.format(self.website), 'debug').logger def init_crawl_date_data(self): # 初始化 采集批次时间 状态 # 开始采集 创建一个批次时间 采集完成更新采集状态 self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).insert_one( { "website": self.website, "crawl_date": self.crawl_date, "crawl_status": 0, # 采集状态 "clean_status": 0, # 数据清洗状态 "to_csv_status": 0 # 导出文件状态 } ) def init_crawl_conf(self): # 获取采集时间 self.crawl_date = get_now_time() self.logger.info("本次数据采集批次为: {}".format(self.crawl_date)) # 初始化 采集批次时间 状态 self.init_crawl_date_data() # 从航线表里提取记录(提取采集航线表的) search_routes = self.db.get_collection( FLIGHTS_WEBSITES_ROUTE_CONF_TAB ).find( { "source_website": self.website, # 数据来源网站 "website_status": 1, # 网站是否采集状态 "flight_route_status": 1 # 航线是否采集状态 }, { "_id": 0 # 排除返回结果中的_id字段,因为MongoDB默认会返回_id,但这里用户可能不需要这个字段 } ) self.logger.info("获取需要采集的航线.并喂给队列") for search_route in search_routes: put_data_by_queue(self.task_queue, search_route) def general_proxies(self): if self.is_online: proxy_meta = self.proxy_meta random_no = ''.join(random.choices('0123456789', k=12)) # region = ''.join(random.choices(["US"], k=1)) # 香港 新加坡 台湾 都能服务 新加坡最快最稳定 # proxy_meta_mid = re.sub(r"_(US)_", f"_{region}_", proxy_meta) proxy_meta_new = re.sub(r"-(x+):", f"-{random_no}:", proxy_meta) # print(f"proxy_meta_new: {proxy_meta_new}") proxies = { "http": proxy_meta_new, "https": proxy_meta_new, } else: proxies = self.proxies return proxies @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000) def request_new_cookie(self): statusTs = int(time.time() * 1000) try: ua, ja3_string = self.ja3_queue.get() proxy = self.general_proxies() get_ck_session = tls_client.Session( ja3_string=ja3_string, ) headers = { 'user-agent': ua, 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } # akm js file url akm_url = "https://www.jetstar.com/xsRfGu1Zb-8uTxanFA/9kX3LGbVJLVb/FB0BajANQQk/YE94/HSh0EkU" data = { 'sensor_data': self.ctx.call('encrypt1', statusTs) } response1 = get_ck_session.post(akm_url, headers=headers, data=data, proxy=proxy ) # print(response1.status_code) # print(response1.text) # print(response1.cookies.get_dict()) # print('111', response.headers) bmsz = response1.cookies.get_dict()['bm_sz'] # print('bmsz => ', bmsz) data2 = { "sensor_data": self.ctx.call('encrypt2', statusTs, bmsz) } data2 = json.dumps(data2) response2 = get_ck_session.post(akm_url, headers=headers, data=data2, proxy=proxy ) # print(response2.text) # print(response2.cookies.get_dict()) if response2.status_code == 201: self.logger.debug('成功获取 cookie bm-sz: {}'.format(bmsz[-16:])) # 返回第一次请求响应的cookie return response1.cookies.get_dict() else: self.logger.error('状态码错误{}, {}'.format(response2.status_code, response2.text)) except Exception as e: print('request cookie error, 重试中..', e) raise @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000) def get_ja3(self): url = "http://8.218.51.130:9003/api/v1/ja3" payload = {} headers = { 'cid': '750B5141EDBF7FA6F73A99C768130099' } response = requests.get(url, headers=headers, data=payload, timeout=15) if response.status_code == 200: # print(response.json()) res_json = response.json() if res_json.get("code") == 0: ja3 = res_json.get("data").get("ja3_str") ua = res_json.get("data").get("ua") if "--" not in ja3 and ",," not in ja3: return ua, ja3 def _refresh_ja3(self): while True: if self.ja3_queue.qsize() < self.ja3_queue_num: try: ua, ja3 = self.get_ja3() # logger.debug('获取ja3成功...') self.ja3_queue.put((ua, ja3)) except Exception as e: self.logger.error(f'ja3接口错误: {e}') time.sleep(3) def _refresh_cookie(self): while True: if self.cookies_queue.qsize() < self.cookie_queue_num: cookie = self.request_new_cookie() self.cookies_queue.put(cookie) time.sleep(3) @retrying.retry(stop_max_attempt_number=5, wait_fixed=3000) def tls_client_get_request(self, url, params, max_redirects=3): ua, ja3_string = self.ja3_queue.get() bmsz_cookie = self.cookies_queue.get() proxy = self.general_proxies() req_session = tls_client.Session( ja3_string=ja3_string, # 直接注入自定义指纹 ) headers = { 'user-agent': ua, 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } redirect_count = 0 current_url = url while redirect_count < max_redirects: response = req_session.get(current_url, headers=headers, cookies=bmsz_cookie, params=params, timeout_seconds=15, # timeout proxy=proxy ) # print(response.status_code) # print(response.text) # 检查是否为重定向状态码 if response.status_code in (301, 302, 303, 307, 308): # 获取 Location 头(需处理相对路径) location = response.headers.get("Location") # if not location: # break current_url = urljoin(current_url, location) redirect_count += 1 elif response.status_code == 200: html = etree.HTML(response.text) data = html.xpath("//script[@id='bundle-data-v2']/text()") if data: json_data = json.loads(data[0]) # 请求成功,归还Cookie 和 ja3 self.cookies_queue.put(bmsz_cookie) # 成功时放回cookie self.ja3_queue.put((ua, ja3_string)) # 回收可用ja3 频繁切换ja3会增加触发验证码几率 return json_data # 返回提取后的数据 else: self.logger.warning(f'触发验证码或拒绝访问错误, 重试中... => {response.text}') raise else: self.logger.error(f'状态码错误, {response.status_code}, 响应内容:{response.text}') raise Exception(f"超过最大重定向次数, 检查({max_redirects})") @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000) def get_cookie_with_verify_price(self): """验价,刷cookie""" statusTs = int(time.time() * 1000) try: ua, ja3_string = self.get_ja3() proxy = self.general_proxies() get_ck_session = tls_client.Session( ja3_string=ja3_string, ) headers = { 'user-agent': ua, 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } # akm js file url akm_url = "https://www.jetstar.com/xsRfGu1Zb-8uTxanFA/9kX3LGbVJLVb/FB0BajANQQk/YE94/HSh0EkU" data = { 'sensor_data': self.ctx.call('encrypt1', statusTs) } response1 = get_ck_session.post(akm_url, headers=headers, data=data, proxy=proxy ) # print(response1.status_code) # print(response1.text) # print(response1.cookies.get_dict()) # print('111', response.headers) bmsz = response1.cookies.get_dict()['bm_sz'] # print('bmsz => ', bmsz) data2 = { "sensor_data": self.ctx.call('encrypt2', statusTs, bmsz) } data2 = json.dumps(data2) response2 = get_ck_session.post(akm_url, headers=headers, data=data2, proxy=proxy ) # print(response2.text) # print(response2.cookies.get_dict()) if response2.status_code == 201: self.logger.debug('成功获取 cookie bm-sz: {}'.format(bmsz[-16:])) # 返回第一次请求响应的cookie return response1.cookies.get_dict() else: self.logger.error('状态码错误{}, {}'.format(response2.status_code, response2.text)) except Exception as e: print('request cookie error, 重试中..', e) raise @retrying.retry(stop_max_attempt_number=5, wait_fixed=3000) def get_flights_with_verify_price(self, from_city_code, to_city_code, search_date, thread_number=0, max_redirects=3): """验价逻辑, 不用队列""" sleep_r = self.time_sleep if self.time_sleep >= 1 else 1 time.sleep(sleep_r) params = { "s": "true", "adults": "1", "children": "0", "infants": "0", "selectedclass1": "economy", "currency": "CNY", "mon": "true", "channel": "DESKTOP", "origin1": from_city_code, # "PVG" "destination1": to_city_code, "departuredate1": search_date } ua, ja3_string = self.get_ja3() bmsz_cookie = self.get_cookie_with_verify_price() proxy = self.general_proxies() req_session = tls_client.Session( ja3_string=ja3_string, # 直接注入自定义指纹 ) headers = { 'user-agent': ua, 'Accept-Encoding': 'gzip, deflate, br', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'pragma': 'no-cache', 'priority': 'u=0, i', 'referer': 'https://booking.jetstar.com/', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1' } redirect_count = 0 current_url = self.search_flights_api while redirect_count < max_redirects: response = req_session.get(current_url, headers=headers, cookies=bmsz_cookie, params=params, timeout_seconds=15, # timeout proxy=proxy ) # print(response.status_code) # print(response.text) # 检查是否为重定向状态码 if response.status_code in (301, 302, 303, 307, 308): # 获取 Location 头(需处理相对路径) location = response.headers.get("Location") # if not location: # break current_url = urljoin(current_url, location) redirect_count += 1 elif response.status_code == 200: html = etree.HTML(response.text) data = html.xpath("//script[@id='bundle-data-v2']/text()") if data: json_data = json.loads(data[0]) return {"inner_data": json_data} # 返回提取后的数据 else: self.logger.warning(f'触发验证码或拒绝访问错误, 重试中... => {response.text}') raise else: self.logger.error(f'状态码错误, {response.status_code}, 响应内容:{response.text}') raise Exception(f"超过最大重定向次数, 检查({max_redirects})") def get_flights(self, from_city_code, to_city_code, search_date, thread_number=0): sleep_r = self.time_sleep if self.time_sleep >= 1 else 1 time.sleep(sleep_r) params = { "s": "true", "adults": "1", "children": "0", "infants": "0", "selectedclass1": "economy", "currency": "CNY", "mon": "true", "channel": "DESKTOP", "origin1": from_city_code, # "PVG" "destination1": to_city_code, "departuredate1": search_date } try: resp_json = self.tls_client_get_request(self.search_flights_api, params) self.logger.info(f'获取数据成功 {from_city_code}-{to_city_code} {search_date} => {str(resp_json)[:200]}') return {"inner_data": resp_json} except Exception as e: self.logger.error( f"thread_number:{thread_number} {from_city_code}-{to_city_code}-{search_date}- 5次重试全部失败 {str(e)}") @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000) def search_flight_date(self, from_city_code, to_city_code, start_date, end_date): """查询航班日期, 即那天有航班""" url = "https://digitalapi.jetstar.com/v1/farecache/flights/batch/availability-with-fareclasses" params = { "flightCount": "1", "includeSoldOut": "true", "requestType": "StarterOnly", "from": start_date, # 采集开始时间 "end": end_date, # 采集结束时间,可随意写 "departures": from_city_code, "arrivals": to_city_code, "direction": "outbound", "paxCount": "1", "includeFees": "false" } response = requests.get(url, headers=self.headers, params=params, verify=False) response.raise_for_status() # for i in response.json(): # print(i) json_data = response.json()[0]['routes'] # 只有一个键值对 for key, val in json_data.items(): flight_dates = list(val.get('flights', {}).keys()) # print(f'航段: {key} 对应有航班的日期为: {flight_dates}') return flight_dates def thread_task(self, thread_number): while True: try: # 从队列取出 航线数据 queue_data = get_data_by_queue(self.task_queue) search_route = queue_data # 从队列里取出本次航线 from_city_code = search_route['from_city_code'] to_city_code = search_route['to_city_code'] # 生成采集天数的日期范围, 额外多一天 start_date, end_date = generate_date_range(self.crawl_days) # 从网站接口查询那天有航班, 日期格式要转换 search_date_list = self.search_flight_date(from_city_code, to_city_code, start_date, end_date) if not search_date_list: self.logger.info(f'{start_date}~{end_date} 内的航段 {from_city_code}-{to_city_code} 无航班') continue for search_date in search_date_list: self.logger.info(f"线程:{thread_number}, 正在采集 => {search_date} {from_city_code}-{to_city_code}") # 日期格式转换 20250531 => 2025-05-31 search_date = time_format_conversion( search_date, in_strftime_str="%Y%m%d", out_strftime_str="%Y-%m-%d" ) flights_json = self.get_flights(from_city_code, to_city_code, search_date, thread_number) if flights_json: Trips = flights_json.get('inner_data', {}).get("Trips", []) if not Trips: self.logger.info( f'Trip 为空, {search_date}:{from_city_code}-{to_city_code}----{flights_json}') continue Flights = Trips[0].get('Flights', []) if len(Flights) > 0: self.logger.info("获取航班信息{1}:{2}-{3}成功, thread_number:{0}".format(thread_number, search_date, from_city_code, to_city_code)) self.save_mongodb_datas(flights_json, from_city_code, to_city_code, search_date) else: self.logger.info( f"获取航班信息为空 {search_date}:{from_city_code}-{to_city_code},thread_number:{thread_number}") else: self.logger.info("thread_number:{0},请求航班信息失败/ 该航线数据可能需要重新抓取{1}:{2}-{3}".format( thread_number, search_date, from_city_code, to_city_code)) except Exception as e: self.logger.error(f"thread_number:{thread_number}- 未知错误--unknown err:{str(e)}") finally: self.task_queue.task_done() def run(self): # 往队列上传任务,初始化 采集批次时间状态 self.init_crawl_conf() thread_list = [] # 刷cookie for _ in range(1, self.cookie_thread_num + 1): t_get_cookie = threading.Thread(target=self._refresh_cookie) thread_list.append(t_get_cookie) # 刷ja3 t_get_ja3 = threading.Thread(target=self._refresh_ja3) thread_list.append(t_get_ja3) # 采集data for thread_number in range(1, self.thread_numbers + 1): t_get_data = threading.Thread(target=self.thread_task, args=(thread_number,)) thread_list.append(t_get_data) for t_obj in thread_list: t_obj.setDaemon(True) t_obj.start() self.task_queue.join() # 更新采集状态 self.update_crawl_date_status() # 关闭数据库连接 self.db.client.close() def save_mongodb_datas(self, info_data, from_city_code, to_city_code, search_date): # 将采集到的数据入库 并打上相应的数据 info_data["website"] = self.website info_data["crawl_date"] = self.crawl_date info_data["search_from_city_code"] = from_city_code # info_data["search_from_airport_code"] = from_airport_code info_data["search_to_city_code"] = to_city_code # info_data["search_to_airport_code"] = to_airport_code info_data["search_date"] = search_date # 搜索日期,采集航班的日期 info_data["clean_status"] = 0 # 给每一条数据标记清理状态 self.db.get_collection(self.info_tab).insert_one(info_data) def update_crawl_date_status(self): # 更新 采集批次时间 状态 find_data = self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).find_one( { "website": self.website, "crawl_date": self.crawl_date, "crawl_status": 0 } ) if find_data: self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).update_one( { "_id": ObjectId(find_data.get("_id")) # 通过文档的 _id 精确匹配 }, { "$set": { "crawl_status": 1 # 将 crawl_status 设为 1(表示已爬取) } } ) if __name__ == "__main__": spider = GKSpider() spider.run()