| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668 |
- import threading
- import queue
- import time
- import json
- import re
- import execjs
- import random
- import retrying
- from lxml import etree
- import datetime
- import requests
- import tls_client
- from urllib.parse import urljoin
- from bson.objectid import ObjectId
- from flights_utils import get_now_time, put_data_by_queue, get_data_by_queue, \
- time_format_conversion
- from my_logger import Logger
- from flights_mongodb import mongo_con_parse
- from settings import CRAWL_DATE_CONF_STATUS_TAB, FLIGHTS_WEBSITES_ROUTE_CONF_TAB, PROXY_TAIL
- def generate_date_range(days):
- """
- 生成 n 天的日期范围
- """
- # 获取今天的日期(结束日期)
- start_date = datetime.datetime.today()
- # 计算开始日期:结束日期 - (days-1) 天(确保包含今天)
- end_date = start_date + datetime.timedelta(days=days)
- # 格式化输出
- return (
- start_date.strftime("%Y-%m-%d"),
- end_date.strftime("%Y-%m-%d")
- )
- class GKSpider():
- def __init__(self):
- self.website = 'gk' # 网站
- self.is_proxy = True
- self.is_online = True #
- if self.is_proxy:
- if self.is_online:
- # proxies = {
- # 'http': f'http://B_3351_HK___5_ss-{ip}:ev2pjj@proxy.renlaer.com:7778',
- # 'https': f'http://B_3351_HK___5_ss-{ip}:ev2pjj@proxy.renlaer.com:7778'
- # }
- self.proxy_meta = f"http://B_3351_HK___5_ss-xxxxxxxxxxxx:{PROXY_TAIL}" # AU / HK
- self.time_sleep = 0.5
- else:
- self.proxy_meta = "http://127.0.0.1:7897"
- # self.time_sleep = 5.5
- self.time_sleep = 0.5
- self.proxies = {
- "http": self.proxy_meta,
- "https": self.proxy_meta,
- }
- else:
- self.proxies = None
- self.time_sleep = 5.5
- self.search_flights_api = "https://booking.jetstar.com/hk/zh/booking/search-flights"
- with open('./js_files/akm逆向5.26.js', encoding='utf-8') as f:
- js = f.read()
- self.ctx = execjs.compile(js)
- self.task_queue = queue.Queue()
- self.cookies_queue = queue.Queue()
- self.ja3_queue = queue.Queue() # 要回收可用ja3 频繁切换ja3会增加触发验证码几率
- self.headers = {
- "accept": "application/json, text/plain, */*",
- "accept-language": "zh-CN,zh;q=0.9",
- "cache-control": "no-cache",
- "culture": "zh-HK",
- "origin": "https://www.jetstar.com",
- "pragma": "no-cache",
- "priority": "u=1, i",
- "referer": "https://www.jetstar.com/",
- "sec-ch-ua": "\"Google Chrome\";v=\"135\", \"Not-A.Brand\";v=\"8\", \"Chromium\";v=\"135\"",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "\"Windows\"",
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-site",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"
- }
- if self.is_online:
- self.cookie_thread_num = 3
- self.thread_numbers = 8
- self.crawl_days = 10
- else:
- self.cookie_thread_num = 3
- self.thread_numbers = 8
- self.crawl_days = 5
- self.cookie_queue_num = 4 # 队列中 cookie 的数量
- self.ja3_queue_num = 4 # 队列中的 ja3 数量
- self.retry_number = 2 # 5
- self.db = mongo_con_parse()
- self.info_tab = "flights_{}_info_tab".format(self.website)
- self.logger = Logger('./logs/{}_flights_spiders.log'.format(self.website), 'debug').logger
- def init_crawl_date_data(self):
- # 初始化 采集批次时间 状态
- # 开始采集 创建一个批次时间 采集完成更新采集状态
- self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).insert_one(
- {
- "website": self.website,
- "crawl_date": self.crawl_date,
- "crawl_status": 0, # 采集状态
- "clean_status": 0, # 数据清洗状态
- "to_csv_status": 0 # 导出文件状态
- }
- )
- def init_crawl_conf(self):
- # 获取采集时间
- self.crawl_date = get_now_time()
- self.logger.info("本次数据采集批次为: {}".format(self.crawl_date))
- # 初始化 采集批次时间 状态
- self.init_crawl_date_data()
- # 从航线表里提取记录(提取采集航线表的)
- search_routes = self.db.get_collection(
- FLIGHTS_WEBSITES_ROUTE_CONF_TAB
- ).find(
- {
- "source_website": self.website, # 数据来源网站
- "website_status": 1, # 网站是否采集状态
- "flight_route_status": 1 # 航线是否采集状态
- },
- {
- "_id": 0 # 排除返回结果中的_id字段,因为MongoDB默认会返回_id,但这里用户可能不需要这个字段
- }
- )
- self.logger.info("获取需要采集的航线.并喂给队列")
- for search_route in search_routes:
- put_data_by_queue(self.task_queue, search_route)
- def general_proxies(self):
- if self.is_online:
- proxy_meta = self.proxy_meta
- random_no = ''.join(random.choices('0123456789', k=12))
- # region = ''.join(random.choices(["US"], k=1))
- # 香港 新加坡 台湾 都能服务 新加坡最快最稳定
- # proxy_meta_mid = re.sub(r"_(US)_", f"_{region}_", proxy_meta)
- proxy_meta_new = re.sub(r"-(x+):", f"-{random_no}:", proxy_meta)
- # print(f"proxy_meta_new: {proxy_meta_new}")
- proxies = {
- "http": proxy_meta_new,
- "https": proxy_meta_new,
- }
- else:
- proxies = self.proxies
- return proxies
- @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000)
- def request_new_cookie(self):
- statusTs = int(time.time() * 1000)
- try:
- ua, ja3_string = self.ja3_queue.get()
- proxy = self.general_proxies()
- get_ck_session = tls_client.Session(
- ja3_string=ja3_string,
- )
- headers = {
- 'user-agent': ua,
- 'Accept-Encoding': 'gzip, deflate, br',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Connection': 'keep-alive',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'priority': 'u=0, i',
- 'referer': 'https://booking.jetstar.com/',
- 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'same-origin',
- 'sec-fetch-user': '?1',
- 'upgrade-insecure-requests': '1'
- }
- # akm js file url
- akm_url = "https://www.jetstar.com/xsRfGu1Zb-8uTxanFA/9kX3LGbVJLVb/FB0BajANQQk/YE94/HSh0EkU"
- data = {
- 'sensor_data': self.ctx.call('encrypt1', statusTs)
- }
- response1 = get_ck_session.post(akm_url, headers=headers, data=data,
- proxy=proxy
- )
- # print(response1.status_code)
- # print(response1.text)
- # print(response1.cookies.get_dict())
- # print('111', response.headers)
- bmsz = response1.cookies.get_dict()['bm_sz']
- # print('bmsz => ', bmsz)
- data2 = {
- "sensor_data": self.ctx.call('encrypt2', statusTs, bmsz)
- }
- data2 = json.dumps(data2)
- response2 = get_ck_session.post(akm_url, headers=headers, data=data2,
- proxy=proxy
- )
- # print(response2.text)
- # print(response2.cookies.get_dict())
- if response2.status_code == 201:
- self.logger.debug('成功获取 cookie bm-sz: {}'.format(bmsz[-16:]))
- # 返回第一次请求响应的cookie
- return response1.cookies.get_dict()
- else:
- self.logger.error('状态码错误{}, {}'.format(response2.status_code, response2.text))
- except Exception as e:
- print('request cookie error, 重试中..', e)
- raise
- @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000)
- def get_ja3(self):
- url = "http://8.218.51.130:9003/api/v1/ja3"
- payload = {}
- headers = {
- 'cid': '750B5141EDBF7FA6F73A99C768130099'
- }
- response = requests.get(url, headers=headers, data=payload, timeout=15)
- if response.status_code == 200:
- # print(response.json())
- res_json = response.json()
- if res_json.get("code") == 0:
- ja3 = res_json.get("data").get("ja3_str")
- ua = res_json.get("data").get("ua")
- if "--" not in ja3 and ",," not in ja3:
- return ua, ja3
- def _refresh_ja3(self):
- while True:
- if self.ja3_queue.qsize() < self.ja3_queue_num:
- try:
- ua, ja3 = self.get_ja3()
- # logger.debug('获取ja3成功...')
- self.ja3_queue.put((ua, ja3))
- except Exception as e:
- self.logger.error(f'ja3接口错误: {e}')
- time.sleep(3)
- def _refresh_cookie(self):
- while True:
- if self.cookies_queue.qsize() < self.cookie_queue_num:
- cookie = self.request_new_cookie()
- self.cookies_queue.put(cookie)
- time.sleep(3)
- @retrying.retry(stop_max_attempt_number=5, wait_fixed=3000)
- def tls_client_get_request(self, url, params, max_redirects=3):
- ua, ja3_string = self.ja3_queue.get()
- bmsz_cookie = self.cookies_queue.get()
- proxy = self.general_proxies()
- req_session = tls_client.Session(
- ja3_string=ja3_string, # 直接注入自定义指纹
- )
- headers = {
- 'user-agent': ua,
- 'Accept-Encoding': 'gzip, deflate, br',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Connection': 'keep-alive',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'priority': 'u=0, i',
- 'referer': 'https://booking.jetstar.com/',
- 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'same-origin',
- 'sec-fetch-user': '?1',
- 'upgrade-insecure-requests': '1'
- }
- redirect_count = 0
- current_url = url
- while redirect_count < max_redirects:
- response = req_session.get(current_url, headers=headers, cookies=bmsz_cookie, params=params,
- timeout_seconds=15, # timeout
- proxy=proxy
- )
- # print(response.status_code)
- # print(response.text)
- # 检查是否为重定向状态码
- if response.status_code in (301, 302, 303, 307, 308):
- # 获取 Location 头(需处理相对路径)
- location = response.headers.get("Location")
- # if not location:
- # break
- current_url = urljoin(current_url, location)
- redirect_count += 1
- elif response.status_code == 200:
- html = etree.HTML(response.text)
- data = html.xpath("//script[@id='bundle-data-v2']/text()")
- if data:
- json_data = json.loads(data[0])
- # 请求成功,归还Cookie 和 ja3
- self.cookies_queue.put(bmsz_cookie) # 成功时放回cookie
- self.ja3_queue.put((ua, ja3_string)) # 回收可用ja3 频繁切换ja3会增加触发验证码几率
- return json_data # 返回提取后的数据
- else:
- self.logger.warning(f'触发验证码或拒绝访问错误, 重试中... => {response.text}')
- raise
- else:
- self.logger.error(f'状态码错误, {response.status_code}, 响应内容:{response.text}')
- raise Exception(f"超过最大重定向次数, 检查({max_redirects})")
- @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000)
- def get_cookie_with_verify_price(self):
- """验价,刷cookie"""
- statusTs = int(time.time() * 1000)
- try:
- ua, ja3_string = self.get_ja3()
- proxy = self.general_proxies()
- get_ck_session = tls_client.Session(
- ja3_string=ja3_string,
- )
- headers = {
- 'user-agent': ua,
- 'Accept-Encoding': 'gzip, deflate, br',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Connection': 'keep-alive',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'priority': 'u=0, i',
- 'referer': 'https://booking.jetstar.com/',
- 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'same-origin',
- 'sec-fetch-user': '?1',
- 'upgrade-insecure-requests': '1'
- }
- # akm js file url
- akm_url = "https://www.jetstar.com/xsRfGu1Zb-8uTxanFA/9kX3LGbVJLVb/FB0BajANQQk/YE94/HSh0EkU"
- data = {
- 'sensor_data': self.ctx.call('encrypt1', statusTs)
- }
- response1 = get_ck_session.post(akm_url, headers=headers, data=data,
- proxy=proxy
- )
- # print(response1.status_code)
- # print(response1.text)
- # print(response1.cookies.get_dict())
- # print('111', response.headers)
- bmsz = response1.cookies.get_dict()['bm_sz']
- # print('bmsz => ', bmsz)
- data2 = {
- "sensor_data": self.ctx.call('encrypt2', statusTs, bmsz)
- }
- data2 = json.dumps(data2)
- response2 = get_ck_session.post(akm_url, headers=headers, data=data2,
- proxy=proxy
- )
- # print(response2.text)
- # print(response2.cookies.get_dict())
- if response2.status_code == 201:
- self.logger.debug('成功获取 cookie bm-sz: {}'.format(bmsz[-16:]))
- # 返回第一次请求响应的cookie
- return response1.cookies.get_dict()
- else:
- self.logger.error('状态码错误{}, {}'.format(response2.status_code, response2.text))
- except Exception as e:
- print('request cookie error, 重试中..', e)
- raise
- @retrying.retry(stop_max_attempt_number=5, wait_fixed=3000)
- def get_flights_with_verify_price(self, from_city_code, to_city_code, search_date, thread_number=0, max_redirects=3):
- """验价逻辑, 不用队列"""
- sleep_r = self.time_sleep if self.time_sleep >= 1 else 1
- time.sleep(sleep_r)
- params = {
- "s": "true",
- "adults": "1",
- "children": "0",
- "infants": "0",
- "selectedclass1": "economy",
- "currency": "CNY",
- "mon": "true",
- "channel": "DESKTOP",
- "origin1": from_city_code, # "PVG"
- "destination1": to_city_code,
- "departuredate1": search_date
- }
- ua, ja3_string = self.get_ja3()
- bmsz_cookie = self.get_cookie_with_verify_price()
- proxy = self.general_proxies()
- req_session = tls_client.Session(
- ja3_string=ja3_string, # 直接注入自定义指纹
- )
- headers = {
- 'user-agent': ua,
- 'Accept-Encoding': 'gzip, deflate, br',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Connection': 'keep-alive',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'priority': 'u=0, i',
- 'referer': 'https://booking.jetstar.com/',
- 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'same-origin',
- 'sec-fetch-user': '?1',
- 'upgrade-insecure-requests': '1'
- }
- redirect_count = 0
- current_url = self.search_flights_api
- while redirect_count < max_redirects:
- response = req_session.get(current_url, headers=headers, cookies=bmsz_cookie, params=params,
- timeout_seconds=15, # timeout
- proxy=proxy
- )
- # print(response.status_code)
- # print(response.text)
- # 检查是否为重定向状态码
- if response.status_code in (301, 302, 303, 307, 308):
- # 获取 Location 头(需处理相对路径)
- location = response.headers.get("Location")
- # if not location:
- # break
- current_url = urljoin(current_url, location)
- redirect_count += 1
- elif response.status_code == 200:
- html = etree.HTML(response.text)
- data = html.xpath("//script[@id='bundle-data-v2']/text()")
- if data:
- json_data = json.loads(data[0])
- return {"inner_data": json_data} # 返回提取后的数据
- else:
- self.logger.warning(f'触发验证码或拒绝访问错误, 重试中... => {response.text}')
- raise
- else:
- self.logger.error(f'状态码错误, {response.status_code}, 响应内容:{response.text}')
- raise Exception(f"超过最大重定向次数, 检查({max_redirects})")
- def get_flights(self, from_city_code, to_city_code, search_date, thread_number=0):
- sleep_r = self.time_sleep if self.time_sleep >= 1 else 1
- time.sleep(sleep_r)
- params = {
- "s": "true",
- "adults": "1",
- "children": "0",
- "infants": "0",
- "selectedclass1": "economy",
- "currency": "CNY",
- "mon": "true",
- "channel": "DESKTOP",
- "origin1": from_city_code, # "PVG"
- "destination1": to_city_code,
- "departuredate1": search_date
- }
- try:
- resp_json = self.tls_client_get_request(self.search_flights_api, params)
- self.logger.info(f'获取数据成功 {from_city_code}-{to_city_code} {search_date} => {str(resp_json)[:200]}')
- return {"inner_data": resp_json}
- except Exception as e:
- self.logger.error(
- f"thread_number:{thread_number} {from_city_code}-{to_city_code}-{search_date}- 5次重试全部失败 {str(e)}")
- @retrying.retry(stop_max_attempt_number=3, wait_fixed=3000)
- def search_flight_date(self, from_city_code, to_city_code, start_date, end_date):
- """查询航班日期, 即那天有航班"""
- url = "https://digitalapi.jetstar.com/v1/farecache/flights/batch/availability-with-fareclasses"
- params = {
- "flightCount": "1",
- "includeSoldOut": "true",
- "requestType": "StarterOnly",
- "from": start_date, # 采集开始时间
- "end": end_date, # 采集结束时间,可随意写
- "departures": from_city_code,
- "arrivals": to_city_code,
- "direction": "outbound",
- "paxCount": "1",
- "includeFees": "false"
- }
- response = requests.get(url, headers=self.headers, params=params, verify=False)
- response.raise_for_status()
- # for i in response.json():
- # print(i)
- json_data = response.json()[0]['routes']
- # 只有一个键值对
- for key, val in json_data.items():
- flight_dates = list(val.get('flights', {}).keys())
- # print(f'航段: {key} 对应有航班的日期为: {flight_dates}')
- return flight_dates
- def thread_task(self, thread_number):
- while True:
- try:
- # 从队列取出 航线数据
- queue_data = get_data_by_queue(self.task_queue)
- search_route = queue_data # 从队列里取出本次航线
- from_city_code = search_route['from_city_code']
- to_city_code = search_route['to_city_code']
- # 生成采集天数的日期范围, 额外多一天
- start_date, end_date = generate_date_range(self.crawl_days)
- # 从网站接口查询那天有航班, 日期格式要转换
- search_date_list = self.search_flight_date(from_city_code, to_city_code, start_date, end_date)
- if not search_date_list:
- self.logger.info(f'{start_date}~{end_date} 内的航段 {from_city_code}-{to_city_code} 无航班')
- continue
- for search_date in search_date_list:
- self.logger.info(f"线程:{thread_number}, 正在采集 => {search_date} {from_city_code}-{to_city_code}")
- # 日期格式转换 20250531 => 2025-05-31
- search_date = time_format_conversion(
- search_date,
- in_strftime_str="%Y%m%d",
- out_strftime_str="%Y-%m-%d"
- )
- flights_json = self.get_flights(from_city_code, to_city_code, search_date, thread_number)
- if flights_json:
- Trips = flights_json.get('inner_data', {}).get("Trips", [])
- if not Trips:
- self.logger.info(
- f'Trip 为空, {search_date}:{from_city_code}-{to_city_code}----{flights_json}')
- continue
- Flights = Trips[0].get('Flights', [])
- if len(Flights) > 0:
- self.logger.info("获取航班信息{1}:{2}-{3}成功, thread_number:{0}".format(thread_number, search_date,
- from_city_code,
- to_city_code))
- self.save_mongodb_datas(flights_json, from_city_code, to_city_code, search_date)
- else:
- self.logger.info(
- f"获取航班信息为空 {search_date}:{from_city_code}-{to_city_code},thread_number:{thread_number}")
- else:
- self.logger.info("thread_number:{0},请求航班信息失败/ 该航线数据可能需要重新抓取{1}:{2}-{3}".format(
- thread_number, search_date, from_city_code, to_city_code))
- except Exception as e:
- self.logger.error(f"thread_number:{thread_number}- 未知错误--unknown err:{str(e)}")
- finally:
- self.task_queue.task_done()
- def run(self):
- # 往队列上传任务,初始化 采集批次时间状态
- self.init_crawl_conf()
- thread_list = []
- # 刷cookie
- for _ in range(1, self.cookie_thread_num + 1):
- t_get_cookie = threading.Thread(target=self._refresh_cookie)
- thread_list.append(t_get_cookie)
- # 刷ja3
- t_get_ja3 = threading.Thread(target=self._refresh_ja3)
- thread_list.append(t_get_ja3)
- # 采集data
- for thread_number in range(1, self.thread_numbers + 1):
- t_get_data = threading.Thread(target=self.thread_task, args=(thread_number,))
- thread_list.append(t_get_data)
- for t_obj in thread_list:
- t_obj.setDaemon(True)
- t_obj.start()
- self.task_queue.join()
- # 更新采集状态
- self.update_crawl_date_status()
- # 关闭数据库连接
- self.db.client.close()
- def save_mongodb_datas(self, info_data, from_city_code, to_city_code, search_date):
- # 将采集到的数据入库 并打上相应的数据
- info_data["website"] = self.website
- info_data["crawl_date"] = self.crawl_date
- info_data["search_from_city_code"] = from_city_code
- # info_data["search_from_airport_code"] = from_airport_code
- info_data["search_to_city_code"] = to_city_code
- # info_data["search_to_airport_code"] = to_airport_code
- info_data["search_date"] = search_date # 搜索日期,采集航班的日期
- info_data["clean_status"] = 0 # 给每一条数据标记清理状态
- self.db.get_collection(self.info_tab).insert_one(info_data)
- def update_crawl_date_status(self):
- # 更新 采集批次时间 状态
- find_data = self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).find_one(
- {
- "website": self.website,
- "crawl_date": self.crawl_date,
- "crawl_status": 0
- }
- )
- if find_data:
- self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).update_one(
- {
- "_id": ObjectId(find_data.get("_id")) # 通过文档的 _id 精确匹配
- },
- {
- "$set":
- {
- "crawl_status": 1 # 将 crawl_status 设为 1(表示已爬取)
- }
- }
- )
- if __name__ == "__main__":
- spider = GKSpider()
- spider.run()
|