clean_gk_flights_datas.py 18 KB


  1. import re
  2. import time
  3. import queue
  4. import threading
  5. from datetime import datetime
  6. from bson.objectid import ObjectId
  7. from my_logger import Logger
  8. from flights_mongodb import mongo_con_parse
  9. from flights_utils import time_format_conversion, get_now_time, put_data_by_queue, get_data_by_queue
  10. from settings import CRAWL_DATE_CONF_STATUS_TAB
  11. from yz_clean_utils import delete_mongodb_clean_datas, save_mongodb_clean_datas, \
  12. get_city_code, format_pc_kg_data
  13. import json
  14. lock = threading.Lock()
  15. def convert_time_format(input_str):
  16. """"2025年5月30日 (週五) 下午3:35 => 20250530153500 """
  17. # 移除括號內的星期信息(如 "週五")
  18. cleaned_str = re.sub(r'\s*\(.*?\)', '', input_str)
  19. # 將中文上午/下午轉換為 AM/PM 標記
  20. cleaned_str = cleaned_str.replace("上午", "AM").replace("下午", "PM")
  21. # 解析日期時間
  22. dt = datetime.strptime(cleaned_str, "%Y年%m月%d日 %p%I:%M")
  23. # 格式化為目標格式,並補齊秒數 (00)
  24. return dt.strftime("%Y%m%d%H%M%S")
  25. class CleanGKDatas():
  26. def __init__(self):
  27. self.website = "gk"
  28. self.logger = Logger('./logs/{}_flights_clear.log'.format(self.website), 'debug').logger
  29. self.db = mongo_con_parse()
  30. self.info_tab = "flights_{}_info_tab".format(self.website)
  31. self.clean_info_tab = "clean_{}".format(self.info_tab)
  32. self.thread_numbers = 1
  33. self.task_queue = queue.Queue()
  34. self.count = 0
  35. def processing_data_one_search(self, find_info_data, thread_number=0, verify_flag=False):
  36. # global has_baggage, baggage
  37. from_airport_code = ''
  38. to_airport_code = ''
  39. search_date = ''
  40. end_flight_datas = []
  41. try:
  42. # print(find_info_data)
  43. # 每一次搜索
  44. from_city_code = find_info_data.get("search_from_city_code")
  45. to_city_code = find_info_data.get("search_to_city_code")
  46. crawl_date = find_info_data.get("crawl_date")
  47. search_date = find_info_data.get("search_date")
  48. # 转为保存数据库的出发日期(in_strftime_str:传入时间的格式) 注:为时间
  49. search_dep_time_base = time_format_conversion(search_date, in_strftime_str="%Y-%m-%d")
  50. # 提取网站源数据
  51. inner_data = find_info_data.get('inner_data')
  52. # 取gk航班
  53. flights = inner_data.get('Trips', [])[0].get("Flights", [])
  54. # 处理不同的航班
  55. for each_flight in flights:
  56. mid_flight_datas = []
  57. DisplayFlightInfo = each_flight.get("DisplayFlightInfo", {}) # 航班信息
  58. from_airport_code = DisplayFlightInfo.get('OriginAirport') # 出发机场
  59. to_airport_code = DisplayFlightInfo.get('DestinationAirport') # 到达机场代码
  60. """提取票价"""
  61. # 不同票价的余票 先设置为舱位的余票,因为每个票价没有对应的余票,
  62. # 所以先检查经济舱是否有余票,无余票就跳过 (商务舱不用管, 因为查询参数传入的是经济舱/ 而且网站响应商务舱的余票好像一直是 0)
  63. EconomyFlightInfo = DisplayFlightInfo.get('EconomyFlightInfo') # 经济舱信息 里面是 余票数
  64. if EconomyFlightInfo['RemainingSeats'] == 0:
  65. print('跳过(经济舱)余票数为 0 的航班')
  66. continue
  67. # 余票
  68. seatCount = EconomyFlightInfo['RemainingSeats']
  69. Bundles = each_flight['Bundles'] # 票价列表
  70. # 税金(手续费)和基本价提取
  71. EconomyPriceBreakdown = json.loads(each_flight['EconomyPriceBreakdown']) # 里面有税金, 是json字符串要转为对象
  72. price = EconomyPriceBreakdown['TotalFare'] # 基础票价
  73. adult_tax = EconomyPriceBreakdown['TotalCharges'] # 同一个航班是不变的
  74. # 提取每个票价
  75. for each_bundle in Bundles:
  76. # 服务包代码
  77. fareBasis = each_bundle['ServiceBundleCode']
  78. # 只采集 最低不带行李的价格 和 一个最低带行李 的票价
  79. # S000: 基本票價 P200: 基本加值套票 行李 + 座位 + 餐膳
  80. if fareBasis not in ['S000', 'P200']:
  81. continue
  82. Amount = each_bundle['Amount'] # 增值服务费, 后面算总价要加上这个
  83. # 托运行李
  84. if fareBasis == 'S000':
  85. pc_amount, kg_amount = '0', '0' # 无托运行李
  86. has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount)
  87. if fareBasis == 'P200':
  88. pc_amount, kg_amount = '1', '20' # 1件行李20公斤
  89. has_baggage, baggage = format_pc_kg_data(pc_amount, kg_amount)
  90. """处理中转"""
  91. segment_elements = []
  92. Legs = DisplayFlightInfo.get('Legs') # 中转航段信息, 最多有3个
  93. # 可能是直达或转乘
  94. for each_leg in Legs:
  95. '''理论航班号信息'''
  96. CarrierCode = each_leg['FlightDesignator']['CarrierCode'] # 航空公司
  97. FlightNumber = each_leg['FlightDesignator']['FlightNumber'] # 航班号
  98. flight_number = f"{CarrierCode}{FlightNumber}" # 拼接航班号
  99. '''实际航班号信息, gk没有这些, ??? 该如何??? => 略 '''
  100. # operatingAirlineCode = each_leg['operatingAirlineCode'] # 营运航空公司代码
  101. # operatingFlightno = each_leg['operatingFlightno']
  102. # operating_flight_number = f"{operatingAirlineCode}{operatingFlightno}"
  103. '''机型'''
  104. # 空中巴士 A320-200 (代碼 32J,全經濟艙配置)
  105. aircraftCode = each_leg['Equipment']['Type']
  106. '''出发/到达时间'''
  107. departDate = each_leg["DisplayStd"] # 计划起飞时间 2025年5月31日 (週六) 上午2:15
  108. arrivalDate = each_leg["DisplaySta"] # 计划到达时间
  109. dep_time = convert_time_format(departDate) # 时间格式转换
  110. arr_time = convert_time_format(arrivalDate)
  111. '''出发/到达机场和城市'''
  112. depart_airport = each_leg['DepartureStation'] # 出发机场
  113. depart_city = get_city_code(self.db, self.website, depart_airport) # 数据库查询对应城市代码
  114. arrival_airport = each_leg['ArrivalStation'] # 到达机场
  115. arrival_city = get_city_code(self.db, self.website, arrival_airport)
  116. '''舱位类型'''
  117. CabinType = each_leg['CabinType']
  118. # 航站楼
  119. # DisplayDepartureAirportTerminal = each_leg['DisplayDepartureAirportTerminal'] # 出发
  120. # DisplayArrivalAirportTerminal = each_leg['DisplayArrivalAirportTerminal'] # 到达
  121. segment_element_demo = {
  122. "flight_number": flight_number, # 航班号
  123. "operating_flight_number": flight_number, # 实际营运航班号 (gk没有这个和上面写为同一个)
  124. "dep_air_port": depart_airport, # 出发机场code str
  125. "dep_city_code": depart_city, # 出发城市code
  126. "dep_time": dep_time, # 出发时间 格式YYYYMMDDHHMM str
  127. "arr_air_port": arrival_airport, # 抵达机场code str
  128. "arr_city_code": arrival_city, # 抵达城市code
  129. "arr_time": arr_time, # 抵达时间 格式YYYYMMDDHHMM str
  130. "cabin": CabinType, # 舱位单字母
  131. "carrier": CarrierCode, # 航空公司代码
  132. "aircraft_code": aircraftCode, # 机型 str
  133. "cabin_class": 1, # 舱位等级, 因为捷星日本航空(GK)的航班目前均为全经济舱​​,故这里也可写死
  134. "stop_cities": "",
  135. "has_baggage": has_baggage, # 是否有托运行李
  136. "baggage": baggage, # 行李配重 1-23 表示一件行李 23kg
  137. "fareBasis": fareBasis, # 在同一航班的不同舱位中是唯一值,主要用来区分同一航班的不同舱位,
  138. }
  139. segment_elements.append(segment_element_demo)
  140. # 查询出发时间(如果响应没有,可能是入参传递的)
  141. search_dep_time = segment_elements[0].get("dep_time") if segment_elements else search_dep_time_base
  142. flight_datas = {
  143. "from_city_code": from_city_code,
  144. "search_dep_time": search_dep_time,
  145. "to_city_code": to_city_code,
  146. "currency": 'JPY', # 网站默认返回搜索地区的国家货币价格, 采集的全都是 日本的航班 故可写死
  147. "adult_price": int(Amount+price), # 基本价格
  148. "adult_tax": int(adult_tax), # 税金
  149. "adult_total_price": int(Amount+price+adult_tax), # 再加 基本价和税金
  150. "route": "",
  151. "seats_remaining": seatCount,
  152. "segments": segment_elements,
  153. "source_website": self.website,
  154. "crawl_date": crawl_date
  155. }
  156. # print(flight_datas)
  157. if verify_flag:
  158. flight_datas["verify_date"] = crawl_date # 验价分支
  159. verify_time = get_now_time()
  160. flight_datas["verify_time"] = verify_time
  161. # print(flight_datas)
  162. mid_flight_datas.append(flight_datas)
  163. # 注意:这里已经跳出bundle循环,将当前航班的所有票价记录(mid_flight_datas)扩展到总结果
  164. end_flight_datas.extend(mid_flight_datas)
  165. if verify_flag:
  166. # 验价分支,
  167. search_from_city_code = find_info_data.get("search_from_city_code")
  168. search_to_city_code = find_info_data.get("search_to_city_code")
  169. search_date = find_info_data.get("search_date")
  170. end_search_date = time_format_conversion(
  171. search_date,
  172. in_strftime_str="%Y-%m-%d",
  173. out_strftime_str="%Y%m%d"
  174. )
  175. delete_many_filter = {
  176. "from_city_code": search_from_city_code,
  177. "to_city_code": search_to_city_code,
  178. "search_dep_time": {"$regex": r"^{}".format(end_search_date)}
  179. }
  180. delete_mongodb_clean_datas(self.db, self.clean_info_tab, self.logger, delete_many_filter)
  181. # 重新保存最新的采集数据
  182. if len(end_flight_datas) > 0:
  183. save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, end_flight_datas, thread_number, True)
  184. except Exception as e:
  185. self.logger.error(f"thread_number:{thread_number} clean error: {from_airport_code}---{to_airport_code}---{search_date}---{str(e)}")
  186. finally:
  187. return end_flight_datas
  188. def processing_data(self, thread_number):
  189. while 1:
  190. log_ob_ids = []
  191. try:
  192. ids = get_data_by_queue(self.task_queue)
  193. ob_ids = [ObjectId(i) for i in ids]
  194. log_ob_ids = ob_ids
  195. # 批量查询 _id 包含在给定列表 ob_ids 中的文档
  196. find_info_datas = self.db.get_collection(self.info_tab).find(
  197. {"_id": {"$in": ob_ids}}
  198. )
  199. final_flight_datas = []
  200. # 每一次搜索航线
  201. for find_info_data in find_info_datas:
  202. with lock:
  203. self.count += 1
  204. if self.count % 50 == 0:
  205. self.logger.info("thread_number:{0}, clean count: {1}".format(
  206. thread_number, self.count))
  207. # print(find_info_data)
  208. # exit()
  209. end_flight_datas = self.processing_data_one_search(find_info_data, thread_number)
  210. final_flight_datas.extend(end_flight_datas)
  211. # 更改这些info表里的数据清理状态为1
  212. update_result = self.db.get_collection(self.info_tab).update_many(
  213. {"_id": {"$in": ob_ids}},
  214. {"$set": {"clean_status": 1}}
  215. )
  216. self.logger.info(f"Updated documents: {update_result.modified_count}")
  217. if len(final_flight_datas) > 0:
  218. save_mongodb_clean_datas(self.db, self.clean_info_tab, self.website, self.logger, final_flight_datas, thread_number, True)
  219. except Exception as e:
  220. self.logger.error(f"thread_number:{thread_number}, log_ob_ids:{log_ob_ids}, clean unknown err:{str(e)}")
  221. finally:
  222. self.task_queue.task_done()
  223. def run_threading(self):
  224. for thread_number in range(1, self.thread_numbers+1):
  225. t = threading.Thread(
  226. target=self.processing_data,
  227. args=(thread_number,)
  228. )
  229. t.daemon = True
  230. t.start()
  231. self.task_queue.join()
  232. def split_datas(self, datas, num):
  233. return [datas[i:i + num] for i in range(0, len(datas), num)]
  234. def run(self):
  235. # 每次清除之前清洗的数据
  236. # 查询已经采集完成但未清洗的数据批次
  237. while 1:
  238. # 取最新采集的数据
  239. find_crawl_conf_datas = self.db.get_collection(
  240. CRAWL_DATE_CONF_STATUS_TAB).find( # find查询
  241. {
  242. "website": self.website,
  243. # "crawl_status": 1, # 反复测试清洗逻辑
  244. "clean_status": 0
  245. }, # 查询条件
  246. sort=[('_id', -1)] # 排序规则,按 _id 降序
  247. ).limit(1) # 仅返回第一条记录(即最新的一条文档)
  248. for find_crawl_conf_data in find_crawl_conf_datas:
  249. self.count = 0
  250. self.logger.info("start clean crawl_date: {}".format(
  251. find_crawl_conf_data.get("crawl_date")
  252. ))
  253. find_info_datas = self.db.get_collection(self.info_tab).find(
  254. {
  255. "crawl_date": find_crawl_conf_data.get("crawl_date"),
  256. "website": self.website,
  257. # "clean_status": 1, # 反复测试清洗逻辑
  258. "clean_status": 0
  259. }
  260. )
  261. find_id_datas = [str(i.get("_id")) for i in find_info_datas]
  262. find_id_datas_splits = self.split_datas(find_id_datas, 5)
  263. # 一个队列里最多放5份info数据
  264. for find_id_data_split in find_id_datas_splits:
  265. put_data_by_queue(self.task_queue, find_id_data_split)
  266. self.run_threading() # 多线程清理
  267. self.logger.info("batch clean all counts: {}".format(self.count))
  268. # 判断整体结束的条件
  269. if find_crawl_conf_data.get("crawl_status") == 1:
  270. self.update_clean_date_status(find_crawl_conf_data)
  271. self.logger.info("end clean crawl_date: {}".format(
  272. find_crawl_conf_data.get("crawl_date")
  273. ))
  274. # 整体结束后, 统一计算本次清理的记录数, 为零则告警
  275. clean_crawl_date_counts = self.db.get_collection(
  276. self.clean_info_tab).count_documents(
  277. {
  278. "crawl_date": find_crawl_conf_data.get("crawl_date")
  279. }
  280. )
  281. self.logger.info("crawl_date: {0}, insert counts: {1}".format(
  282. find_crawl_conf_data.get("crawl_date"), clean_crawl_date_counts
  283. ))
  284. # 发送钉钉通知
  285. try:
  286. from clean_datas_send_notice import CheckCrawlDatas
  287. C = CheckCrawlDatas()
  288. C.send_website_spiders_crawl_date_counts(
  289. self.website, find_crawl_conf_data.get("crawl_date"),
  290. clean_crawl_date_counts
  291. )
  292. except Exception as e:
  293. self.logger.info("send dingding error: {0}".format(str(e)))
  294. time.sleep(10 * 1)
  295. def update_clean_date_status(self, find_crawl_conf_data):
  296. # 更新 采集批次时间 状态
  297. self.db.get_collection(CRAWL_DATE_CONF_STATUS_TAB).update_one(
  298. {
  299. "_id": ObjectId(find_crawl_conf_data.get("_id"))
  300. },
  301. {
  302. "$set":
  303. {
  304. "clean_status": 1
  305. }
  306. }
  307. )
  308. if __name__ == "__main__":
  309. C = CleanGKDatas()
  310. C.run()