import re import time import queue import threading from datetime import datetime from bson.objectid import ObjectId from my_logger import Logger from flights_mongodb import mongo_con_parse from flights_utils import time_format_conversion, get_now_time, put_data_by_queue, get_data_by_queue from settings import CRAWL_DATE_CONF_STATUS_TAB from yz_clean_utils import delete_mongodb_clean_datas, save_mongodb_clean_datas, \ get_city_code, format_pc_kg_data import json lock = threading.Lock() def convert_time_format(input_str): """"2025年5月30日 (週五) 下午3:35 => 20250530153500 """ # 移除括號內的星期信息(如 "週五") cleaned_str = re.sub(r'\s*\(.*?\)', '', input_str) # 將中文上午/下午轉換為 AM/PM 標記 cleaned_str = cleaned_str.replace("上午", "AM").replace("下午", "PM") # 解析日期時間 dt = datetime.strptime(cleaned_str, "%Y年%m月%d日 %p%I:%M") # 格式化為目標格式,並補齊秒數 (00) return dt.strftime("%Y%m%d%H%M%S") class CleanGKDatas(): def __init__(self): self.website = "gk" self.logger = Logger('./logs/{}_flights_clear.log'.format(self.website), 'debug').logger self.db = mongo_con_parse() self.info_tab = "flights_{}_info_tab".format(self.website) self.clean_info_tab = "clean_{}".format(self.info_tab) self.thread_numbers = 1 self.task_queue = queue.Queue() self.count = 0 def processing_data_one_search(self, find_info_data, thread_number=0, verify_flag=False): # global has_baggage, baggage from_airport_code = '' to_airport_code = '' search_date = '' end_flight_datas = [] try: # print(find_info_data) # 每一次搜索 from_city_code = find_info_data.get("search_from_city_code") to_city_code = find_info_data.get("search_to_city_code") crawl_date = find_info_data.get("crawl_date") search_date = find_info_data.get("search_date") # 转为保存数据库的出发日期(in_strftime_str:传入时间的格式) 注:为时间 search_dep_time_base = time_format_conversion(search_date, in_strftime_str="%Y-%m-%d") # 提取网站源数据 inner_data = find_info_data.get('inner_data') # 取gk航班 flights = inner_data.get('Trips', [])[0].get("Flights", []) # 处理不同的航班 for each_flight in flights: mid_flight_datas = [] DisplayFlightInfo = each_flight.get("DisplayFlightInfo", {}) # 航班信息 from_airport_code = DisplayFlightInfo.get('OriginAirport') # 出发机场 to_airport_code = DisplayFlightInfo.get('DestinationAirport') # 到达机场代码 """提取票价""" # 不同票价的余票 先设置为舱位的余票,因为每个票价没有对应的余票, # 所以先检查经济舱是否有余票,无余票就跳过 (商务舱不用管, 因为查询参数传入的是经济舱/ 而且网站响应商务舱的余票好像一直是 0) EconomyFlightInfo = DisplayFlightInfo.get('EconomyFlightInfo') # 经济舱信息 里面是 余票数 if EconomyFlightInfo['RemainingSeats'] == 0: print('跳过(经济舱)余票数为 0 的航班') continue # 余票 seatCount = EconomyFlightInfo['RemainingSeats'] Bundles = each_flight['Bundles'] # 票价列表 # 税金(手续费)和基本价提取 EconomyPriceBreakdown = json.loads(each_flight['EconomyPriceBreakdown']) # 里面有税金, 是json字符串要转为对象 price = EconomyPriceBreakdown['TotalFare'] # 基础票价 adult_tax = EconomyPriceBreakdown['TotalCharges'] # 同一个航班是不变的 # 提取每个票价 for each_bundle in Bundles: # 服务包代码 fareBasis = each_bundle['ServiceBundleCode'] # 只采集 最低不带行李的价格 和 一个最低带行李 的票价 # S000: 基本票價 P200: 基本加值套票 行李 + 座位 + 餐膳 if fareBasis not in ['S000', 'P200']: continue Amount = each_bundle['Amount'] # 增值服务费, 后面算总价要加上这个 # 托运行李 if fareBasis == 'S000': pc_amount, kg_amount = '0', '0' # 无托运行李 has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount) if fareBasis == 'P200': pc_amount, kg_amount = '1', '20' # 1件行李20公斤 has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount) """处理中转""" segment_elements = [] Legs = DisplayFlightInfo.get('Legs') # 中转航段信息, 最多有3个 # 可能是直达或转乘 for each_leg in Legs: '''理论航班号信息''' CarrierCode = each_leg['FlightDesignator']['CarrierCode'] # 航空公司 FlightNumber = each_leg['FlightDesignator']['FlightNumber'] # 航班号 flight_number = f"{CarrierCode}{FlightNumber}" # 拼接航班号 '''实际航班号信息, gk没有这些, ??? 该如何??? => 略 ''' # operatingAirlineCode = each_leg['operatingAirlineCode'] # 营运航空公司代码 # operatingFlightno = each_leg['operatingFlightno'] # operating_flight_number = f"{operatingAirlineCode}{operatingFlightno}" '''机型''' # 空中巴士 A320-200 (代碼 32J,全經濟艙配置) aircraftCode = each_leg['Equipment']['Type'] '''出发/到达时间''' departDate = each_leg["DisplayStd"] # 计划起飞时间 2025年5月31日 (週六) 上午2:15 arrivalDate = each_leg["DisplaySta"] # 计划到达时间 dep_time = convert_time_format(departDate) # 时间格式转换 arr_time = convert_time_format(arrivalDate) '''出发/到达机场和城市''' depart_airport = each_leg['DepartureStation'] # 出发机场 depart_city = get_city_code(self.db, self.website, depart_airport) # 数据库查询对应城市代码 arrival_airport = each_leg['ArrivalStation'] # 到达机场 arrival_city = get_city_code(self.db, self.website, arrival_airport) '''舱位类型''' CabinType = each_leg['CabinType'] # 航站楼 # DisplayDepartureAirportTerminal = each_leg['DisplayDepartureAirportTerminal'] # 出发 # DisplayArrivalAirportTerminal = each_leg['DisplayArrivalAirportTerminal'] # 到达 segment_element_demo = { "flight_number": flight_number, # 航班号 "operating_flight_number": flight_number, # 实际营运航班号 (gk没有这个和上面写为同一个) "dep_air_port": depart_airport, # 出发机场code str "dep_city_code": depart_city, # 出发城市code "dep_time": dep_time, # 出发时间 格式YYYYMMDDHHMM str "arr_air_port": arrival_airport, # 抵达机场code str "arr_city_code": arrival_city, # 抵达城市code "arr_time": arr_time, # 抵达时间 格式YYYYMMDDHHMM str "cabin": CabinType, # 舱位单字母 "carrier": CarrierCode, # 航空公司代码 "aircraft_code": aircraftCode, # 机型 str "cabin_class": 1, # 舱位等级, 因为捷星日本航空(GK)的航班目前均为全经济舱​​,故这里也可写死 "stop_cities": "", "has_baggage": has_baggage, # 是否有托运行李 "baggage": baggage, # 行李配重 1-23 表示一件行李 23kg "fareBasis": fareBasis, # 在同一航班的不同舱位中是唯一值,主要用来区分同一航班的不同舱位, } segment_elements.append(segment_element_demo) # 查询出发时间(如果响应没有,可能是入参传递的) search_dep_time = segment_elements[0].get("dep_time") if segment_elements else search_dep_time_base flight_datas = { "from_city_code": from_city_code, "search_dep_time": search_dep_time, "to_city_code": to_city_code, "currency": 'JPY', # 网站默认返回搜索地区的国家货币价格, 采集的全都是 日本的航班 故可写死 "adult_price": int(Amount+price), # 基本价格 "adult_tax": int(adult_tax), # 税金 "adult_total_price": int(Amount+price+adult_tax), # 再加 基本价和税金 "route": "", "seats_remaining": seatCount, "segments": segment_elements, "source_website": self.website, "crawl_date": crawl_date } # print(flight_datas) if verify_flag: flight_datas["verify_date"] = crawl_date # 验价分支 verify_time = get_now_time() flight_datas["verify_time"] = verify_time # print(flight_datas) mid_flight_datas.append(flight_datas) # 注意:这里已经跳出bundle循环,将当前航班的所有票价记录(mid_flight_datas)扩展到总结果 end_flight_datas.extend(mid_flight_datas) if verify_flag: # 验价分支, search_from_city_code = find_info_data.get("search_from_city_code") search_to_city_code = find_info_data.get("search_to_city_code") search_date = find_info_data.get("search_date") end_search_date = time_format_conversion( search_date, in_strftime_str="%Y-%m-%d", out_strftime_str="%Y%m%d" ) delete_many_filter = { "from_city_code": search_from_city_code, "to_city_code": search_to_city_code, "search_dep_time": {"$regex": r"^{}".format(end_search_date)} } delete_mongodb_clean_datas(self.db, self.clean_info_tab, self.logger, delete_many_filter) # 重新保存最新的采集数据 if len(end_flight_datas) > 0: save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, end_flight_datas, thread_number, True) except Exception as e: self.logger.error(f"thread_number:{thread_number} clean error: {from_airport_code}---{to_airport_code}---{search_date}---{str(e)}") finally: return end_flight_datas def processing_data(self, thread_number): while 1: log_ob_ids = [] try: ids = get_data_by_queue(self.task_queue) ob_ids = [ObjectId(i) for i in ids] log_ob_ids = ob_ids # 批量查询 _id 包含在给定列表 ob_ids 中的文档 find_info_datas = self.db.get_collection(self.info_tab).find( {"_id": {"$in": ob_ids}} ) final_flight_datas = [] # 每一次搜索航线 for find_info_data in find_info_datas: with lock: self.count += 1 if self.count % 50 == 0: self.logger.info("thread_number:{0}, clean count: {1}".format( thread_number, self.count)) # print(find_info_data) # exit() end_flight_datas = self.processing_data_one_search(find_info_data, thread_number) final_flight_datas.extend(end_flight_datas) # 更改这些info表里的数据清理状态为1 update_result = self.db.get_collection(self.info_tab).update_many( {"_id": {"$in": ob_ids}}, {"$set": {"clean_status": 1}} ) self.logger.info(f"Updated documents: {update_result.modified_count}") if len(final_flight_datas) > 0: save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, final_flight_datas, thread_number, True) except Exception as e: self.logger.error(f"thread_number:{thread_number}, log_ob_ids:{log_ob_ids}, clean unknown err:{str(e)}") finally: self.task_queue.task_done() def run_threading(self): for thread_number in range(1, self.thread_numbers+1): t = threading.Thread( target=self.processing_data, args=(thread_number,) ) t.daemon = True t.start() self.task_queue.join() def split_datas(self, datas, num): return [datas[i:i + num] for i in range(0, len(datas), num)] def run(self): # 每次清除之前清洗的数据 # 查询已经采集完成但未清洗的数据批次 while 1: # 取最新采集的数据 find_crawl_conf_datas = self.db.get_collection( CRAWL_DATE_CONF_STATUS_TAB).find( # find查询 { "website": self.website, # "crawl_status": 1, # 反复测试清洗逻辑 "clean_status": 0 }, # 查询条件 sort=[('_id', -1)] # 排序规则,按 _id 降序 ).limit(1) # 仅返回第一条记录(即最新的一条文档) for find_crawl_conf_data in find_crawl_conf_datas: self.count = 0 self.logger.info("start clean crawl_date: {}".format( find_crawl_conf_data.get("crawl_date") )) find_info_datas = self.db.get_collection(self.info_tab).find( { "crawl_date": find_crawl_conf_data.get("crawl_date"), "website": self.website, # "clean_status": 1, # 反复测试清洗逻辑 "clean_status": 0 } ) find_id_datas = [str(i.get("_id")) for i in find_info_datas] find_id_datas_splits = self.split_datas(find_id_datas, 5) # 一个队列里最多放5份info数据 for find_id_data_split in find_id_datas_splits: put_data_by_queue(self.task_queue, find_id_data_split) self.run_threading() # 多线程清理 self.logger.info("batch clean all counts: {}".format(self.count)) # 判断整体结束的条件 if find_crawl_conf_data.get("crawl_status") == 1: self.update_clean_date_status(find_crawl_conf_data) self.logger.info("end clean crawl_date: {}".format( find_crawl_conf_data.get("crawl_date") )) # 整体结束后, 统一计算本次清理的记录数, 为零则告警 clean_crawl_date_counts = self.db.get_collection( self.clean_info_tab).count_documents( { "crawl_date": find_crawl_conf_data.get("crawl_date") } ) self.logger.info("crawl_date: {0}, insert counts: {1}".format( find_crawl_conf_data.get("crawl_date"), clean_crawl_date_counts )) # 发送钉钉通知 try: from clean_datas_send_notice import CheckCrawlDatas C = CheckCrawlDatas() C.send_website_spiders_crawl_date_counts( self.website, find_crawl_conf_data.get("crawl_date"), clean_crawl_date_counts ) except Exception as e: self.logger.info("send dingding error: {0}".format(str(e))) time.sleep(10 * 1) def update_clean_date_status(self, find_crawl_conf_data): # 更新 采集批次时间 状态 self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).update_one( { "_id": ObjectId(find_crawl_conf_data.get("_id")) }, { "$set": { "clean_status": 1 } } ) if __name__ == "__main__": C = CleanGKDatas() C.run()