| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- import re
- import time
- import queue
- import threading
- from datetime import datetime
- from bson.objectid import ObjectId
- from my_logger import Logger
- from flights_mongodb import mongo_con_parse
- from flights_utils import time_format_conversion, get_now_time, put_data_by_queue, get_data_by_queue
- from settings import CRAWL_DATE_CONF_STATUS_TAB
- from yz_clean_utils import delete_mongodb_clean_datas, save_mongodb_clean_datas, \
- get_city_code, format_pc_kg_data
- import json
- lock = threading.Lock()
- def convert_time_format(input_str):
- """"2025年5月30日 (週五) 下午3:35 => 20250530153500 """
- # 移除括號內的星期信息(如 "週五")
- cleaned_str = re.sub(r'\s*\(.*?\)', '', input_str)
- # 將中文上午/下午轉換為 AM/PM 標記
- cleaned_str = cleaned_str.replace("上午", "AM").replace("下午", "PM")
- # 解析日期時間
- dt = datetime.strptime(cleaned_str, "%Y年%m月%d日 %p%I:%M")
- # 格式化為目標格式,並補齊秒數 (00)
- return dt.strftime("%Y%m%d%H%M%S")
- class CleanGKDatas():
- def __init__(self):
- self.website = "gk"
- self.logger = Logger('./logs/{}_flights_clear.log'.format(self.website), 'debug').logger
- self.db = mongo_con_parse()
- self.info_tab = "flights_{}_info_tab".format(self.website)
- self.clean_info_tab = "clean_{}".format(self.info_tab)
- self.thread_numbers = 1
- self.task_queue = queue.Queue()
- self.count = 0
- def processing_data_one_search(self, find_info_data, thread_number=0, verify_flag=False):
- # global has_baggage, baggage
- from_airport_code = ''
- to_airport_code = ''
- search_date = ''
- end_flight_datas = []
- try:
- # print(find_info_data)
- # 每一次搜索
- from_city_code = find_info_data.get("search_from_city_code")
- to_city_code = find_info_data.get("search_to_city_code")
- crawl_date = find_info_data.get("crawl_date")
- search_date = find_info_data.get("search_date")
- # 转为保存数据库的出发日期(in_strftime_str:传入时间的格式) 注:为时间
- search_dep_time_base = time_format_conversion(search_date, in_strftime_str="%Y-%m-%d")
- # 提取网站源数据
- inner_data = find_info_data.get('inner_data')
- # 取gk航班
- flights = inner_data.get('Trips', [])[0].get("Flights", [])
- # 处理不同的航班
- for each_flight in flights:
- mid_flight_datas = []
- DisplayFlightInfo = each_flight.get("DisplayFlightInfo", {}) # 航班信息
- from_airport_code = DisplayFlightInfo.get('OriginAirport') # 出发机场
- to_airport_code = DisplayFlightInfo.get('DestinationAirport') # 到达机场代码
- """提取票价"""
- # 不同票价的余票 先设置为舱位的余票,因为每个票价没有对应的余票,
- # 所以先检查经济舱是否有余票,无余票就跳过 (商务舱不用管, 因为查询参数传入的是经济舱/ 而且网站响应商务舱的余票好像一直是 0)
- EconomyFlightInfo = DisplayFlightInfo.get('EconomyFlightInfo') # 经济舱信息 里面是 余票数
- if EconomyFlightInfo['RemainingSeats'] == 0:
- print('跳过(经济舱)余票数为 0 的航班')
- continue
- # 余票
- seatCount = EconomyFlightInfo['RemainingSeats']
- Bundles = each_flight['Bundles'] # 票价列表
- # 税金(手续费)和基本价提取
- EconomyPriceBreakdown = json.loads(each_flight['EconomyPriceBreakdown']) # 里面有税金, 是json字符串要转为对象
- price = EconomyPriceBreakdown['TotalFare'] # 基础票价
- adult_tax = EconomyPriceBreakdown['TotalCharges'] # 同一个航班是不变的
- # 提取每个票价
- for each_bundle in Bundles:
- # 服务包代码
- fareBasis = each_bundle['ServiceBundleCode']
- # 只采集 最低不带行李的价格 和 一个最低带行李 的票价
- # S000: 基本票價 P200: 基本加值套票 行李 + 座位 + 餐膳
- if fareBasis not in ['S000', 'P200']:
- continue
- Amount = each_bundle['Amount'] # 增值服务费, 后面算总价要加上这个
- # 托运行李
- if fareBasis == 'S000':
- pc_amount, kg_amount = '0', '0' # 无托运行李
- has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount)
- if fareBasis == 'P200':
- pc_amount, kg_amount = '1', '20' # 1件行李20公斤
- has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount)
- """处理中转"""
- segment_elements = []
- Legs = DisplayFlightInfo.get('Legs') # 中转航段信息, 最多有3个
- # 可能是直达或转乘
- for each_leg in Legs:
- '''理论航班号信息'''
- CarrierCode = each_leg['FlightDesignator']['CarrierCode'] # 航空公司
- FlightNumber = each_leg['FlightDesignator']['FlightNumber'] # 航班号
- flight_number = f"{CarrierCode}{FlightNumber}" # 拼接航班号
- '''实际航班号信息, gk没有这些, ??? 该如何??? => 略 '''
- # operatingAirlineCode = each_leg['operatingAirlineCode'] # 营运航空公司代码
- # operatingFlightno = each_leg['operatingFlightno']
- # operating_flight_number = f"{operatingAirlineCode}{operatingFlightno}"
- '''机型'''
- # 空中巴士 A320-200 (代碼 32J,全經濟艙配置)
- aircraftCode = each_leg['Equipment']['Type']
- '''出发/到达时间'''
- departDate = each_leg["DisplayStd"] # 计划起飞时间 2025年5月31日 (週六) 上午2:15
- arrivalDate = each_leg["DisplaySta"] # 计划到达时间
- dep_time = convert_time_format(departDate) # 时间格式转换
- arr_time = convert_time_format(arrivalDate)
- '''出发/到达机场和城市'''
- depart_airport = each_leg['DepartureStation'] # 出发机场
- depart_city = get_city_code(self.db, self.website, depart_airport) # 数据库查询对应城市代码
- arrival_airport = each_leg['ArrivalStation'] # 到达机场
- arrival_city = get_city_code(self.db, self.website, arrival_airport)
- '''舱位类型'''
- CabinType = each_leg['CabinType']
- # 航站楼
- # DisplayDepartureAirportTerminal = each_leg['DisplayDepartureAirportTerminal'] # 出发
- # DisplayArrivalAirportTerminal = each_leg['DisplayArrivalAirportTerminal'] # 到达
- segment_element_demo = {
- "flight_number": flight_number, # 航班号
- "operating_flight_number": flight_number, # 实际营运航班号 (gk没有这个和上面写为同一个)
- "dep_air_port": depart_airport, # 出发机场code str
- "dep_city_code": depart_city, # 出发城市code
- "dep_time": dep_time, # 出发时间 格式YYYYMMDDHHMM str
- "arr_air_port": arrival_airport, # 抵达机场code str
- "arr_city_code": arrival_city, # 抵达城市code
- "arr_time": arr_time, # 抵达时间 格式YYYYMMDDHHMM str
- "cabin": CabinType, # 舱位单字母
- "carrier": CarrierCode, # 航空公司代码
- "aircraft_code": aircraftCode, # 机型 str
- "cabin_class": 1, # 舱位等级, 因为捷星日本航空(GK)的航班目前均为全经济舱,故这里也可写死
- "stop_cities": "",
- "has_baggage": has_baggage, # 是否有托运行李
- "baggage": baggage, # 行李配重 1-23 表示一件行李 23kg
- "fareBasis": fareBasis, # 在同一航班的不同舱位中是唯一值,主要用来区分同一航班的不同舱位,
- }
- segment_elements.append(segment_element_demo)
- # 查询出发时间(如果响应没有,可能是入参传递的)
- search_dep_time = segment_elements[0].get("dep_time") if segment_elements else search_dep_time_base
- flight_datas = {
- "from_city_code": from_city_code,
- "search_dep_time": search_dep_time,
- "to_city_code": to_city_code,
- "currency": 'JPY', # 网站默认返回搜索地区的国家货币价格, 采集的全都是 日本的航班 故可写死
- "adult_price": int(Amount+price), # 基本价格
- "adult_tax": int(adult_tax), # 税金
- "adult_total_price": int(Amount+price+adult_tax), # 再加 基本价和税金
- "route": "",
- "seats_remaining": seatCount,
- "segments": segment_elements,
- "source_website": self.website,
- "crawl_date": crawl_date
- }
- # print(flight_datas)
- if verify_flag:
- flight_datas["verify_date"] = crawl_date # 验价分支
- verify_time = get_now_time()
- flight_datas["verify_time"] = verify_time
- # print(flight_datas)
- mid_flight_datas.append(flight_datas)
- # 注意:这里已经跳出bundle循环,将当前航班的所有票价记录(mid_flight_datas)扩展到总结果
- end_flight_datas.extend(mid_flight_datas)
- if verify_flag:
- # 验价分支,
- search_from_city_code = find_info_data.get("search_from_city_code")
- search_to_city_code = find_info_data.get("search_to_city_code")
- search_date = find_info_data.get("search_date")
- end_search_date = time_format_conversion(
- search_date,
- in_strftime_str="%Y-%m-%d",
- out_strftime_str="%Y%m%d"
- )
- delete_many_filter = {
- "from_city_code": search_from_city_code,
- "to_city_code": search_to_city_code,
- "search_dep_time": {"$regex": r"^{}".format(end_search_date)}
- }
- delete_mongodb_clean_datas(self.db, self.clean_info_tab, self.logger, delete_many_filter)
- # 重新保存最新的采集数据
- if len(end_flight_datas) > 0:
- save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, end_flight_datas, thread_number, True)
- except Exception as e:
- self.logger.error(f"thread_number:{thread_number} clean error: {from_airport_code}---{to_airport_code}---{search_date}---{str(e)}")
- finally:
- return end_flight_datas
- def processing_data(self, thread_number):
- while 1:
- log_ob_ids = []
- try:
- ids = get_data_by_queue(self.task_queue)
- ob_ids = [ObjectId(i) for i in ids]
- log_ob_ids = ob_ids
- # 批量查询 _id 包含在给定列表 ob_ids 中的文档
- find_info_datas = self.db.get_collection(self.info_tab).find(
- {"_id": {"$in": ob_ids}}
- )
- final_flight_datas = []
- # 每一次搜索航线
- for find_info_data in find_info_datas:
- with lock:
- self.count += 1
- if self.count % 50 == 0:
- self.logger.info("thread_number:{0}, clean count: {1}".format(
- thread_number, self.count))
- # print(find_info_data)
- # exit()
- end_flight_datas = self.processing_data_one_search(find_info_data, thread_number)
- final_flight_datas.extend(end_flight_datas)
- # 更改这些info表里的数据清理状态为1
- update_result = self.db.get_collection(self.info_tab).update_many(
- {"_id": {"$in": ob_ids}},
- {"$set": {"clean_status": 1}}
- )
- self.logger.info(f"Updated documents: {update_result.modified_count}")
- if len(final_flight_datas) > 0:
- save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, final_flight_datas, thread_number, True)
- except Exception as e:
- self.logger.error(f"thread_number:{thread_number}, log_ob_ids:{log_ob_ids}, clean unknown err:{str(e)}")
- finally:
- self.task_queue.task_done()
- def run_threading(self):
- for thread_number in range(1, self.thread_numbers+1):
- t = threading.Thread(
- target=self.processing_data,
- args=(thread_number,)
- )
- t.daemon = True
- t.start()
- self.task_queue.join()
- def split_datas(self, datas, num):
- return [datas[i:i + num] for i in range(0, len(datas), num)]
- def run(self):
- # 每次清除之前清洗的数据
- # 查询已经采集完成但未清洗的数据批次
- while 1:
- # 取最新采集的数据
- find_crawl_conf_datas = self.db.get_collection(
- CRAWL_DATE_CONF_STATUS_TAB).find( # find查询
- {
- "website": self.website,
- # "crawl_status": 1, # 反复测试清洗逻辑
- "clean_status": 0
- }, # 查询条件
- sort=[('_id', -1)] # 排序规则,按 _id 降序
- ).limit(1) # 仅返回第一条记录(即最新的一条文档)
- for find_crawl_conf_data in find_crawl_conf_datas:
- self.count = 0
- self.logger.info("start clean crawl_date: {}".format(
- find_crawl_conf_data.get("crawl_date")
- ))
- find_info_datas = self.db.get_collection(self.info_tab).find(
- {
- "crawl_date": find_crawl_conf_data.get("crawl_date"),
- "website": self.website,
- # "clean_status": 1, # 反复测试清洗逻辑
- "clean_status": 0
- }
- )
- find_id_datas = [str(i.get("_id")) for i in find_info_datas]
- find_id_datas_splits = self.split_datas(find_id_datas, 5)
- # 一个队列里最多放5份info数据
- for find_id_data_split in find_id_datas_splits:
- put_data_by_queue(self.task_queue, find_id_data_split)
- self.run_threading() # 多线程清理
- self.logger.info("batch clean all counts: {}".format(self.count))
- # 判断整体结束的条件
- if find_crawl_conf_data.get("crawl_status") == 1:
- self.update_clean_date_status(find_crawl_conf_data)
- self.logger.info("end clean crawl_date: {}".format(
- find_crawl_conf_data.get("crawl_date")
- ))
- # 整体结束后, 统一计算本次清理的记录数, 为零则告警
- clean_crawl_date_counts = self.db.get_collection(
- self.clean_info_tab).count_documents(
- {
- "crawl_date": find_crawl_conf_data.get("crawl_date")
- }
- )
- self.logger.info("crawl_date: {0}, insert counts: {1}".format(
- find_crawl_conf_data.get("crawl_date"), clean_crawl_date_counts
- ))
- # 发送钉钉通知
- try:
- from clean_datas_send_notice import CheckCrawlDatas
- C = CheckCrawlDatas()
- C.send_website_spiders_crawl_date_counts(
- self.website, find_crawl_conf_data.get("crawl_date"),
- clean_crawl_date_counts
- )
- except Exception as e:
- self.logger.info("send dingding error: {0}".format(str(e)))
- time.sleep(10 * 1)
- def update_clean_date_status(self, find_crawl_conf_data):
- # 更新 采集批次时间 状态
- self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).update_one(
- {
- "_id": ObjectId(find_crawl_conf_data.get("_id"))
- },
- {
- "$set":
- {
- "clean_status": 1
- }
- }
- )
- if __name__ == "__main__":
- C = CleanGKDatas()
- C.run()
|