descending_cabin_task.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. 无询价 + 无验价任务:直接按航班号/舱位/行李匹配 result 取 data
  3. """
  4. import os
  5. import json
  6. import time
  7. import requests
  8. import threading
  9. from datetime import datetime
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. from xmi_logger import XmiLogger
  12. import csv
  13. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  14. def time_format_conversion(match_time,
  15. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  16. out_strftime_str="%Y%m%d%H%M%S"):
  17. """
  18. 时间格式转换
  19. match_time 输入的时间 023-07-29T04:15:00
  20. in_strftime_str 输入的时间格式
  21. out_strftime_str 输出的时间格式
  22. """
  23. time_array = time.strptime(match_time, in_strftime_str)
  24. return time.strftime(out_strftime_str, time_array)
  25. def _process_one_task(row):
  26. task = row
  27. city_pair = (task.get("citypair") or "").strip()
  28. if "-" not in city_pair:
  29. return None
  30. from_city_code, to_city_code = city_pair.split("-", 1)
  31. flight_numbers_raw = (
  32. task.get("flight_numbers")
  33. or ""
  34. ).strip()
  35. flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()]
  36. if not flight_numbers:
  37. return None
  38. cabins_raw = (task.get("cabins") or "").strip()
  39. cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else []
  40. if not cabin_list:
  41. cabin_list = ["Y"] * len(flight_numbers)
  42. if len(cabin_list) < len(flight_numbers):
  43. cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list)))
  44. baggage_val = str(task.get("baggage_weight") or "0").strip()
  45. try:
  46. kg = int(float(baggage_val))
  47. except (TypeError, ValueError):
  48. kg = 0
  49. pc = 0 if kg <= 0 else 1
  50. dep_time = (
  51. task.get("from_time")
  52. or f"{task.get('from_date') or ''} 00:00:00"
  53. )
  54. drop_price_change_upper = float(task.get("drop_price_change_upper") or 0) # 最小的降价幅度
  55. max_threshold = round(drop_price_change_upper * 0.5)
  56. end_segments = []
  57. for idx, flight_number in enumerate(flight_numbers):
  58. carrier = "".join([c for c in flight_number if c.isalpha()])
  59. end_segments.append({
  60. "carrier": carrier,
  61. "flight_number": flight_number,
  62. "dep_city_code": from_city_code,
  63. "arr_city_code": to_city_code,
  64. "cabin": cabin_list[idx],
  65. "dep_time": dep_time,
  66. })
  67. return {
  68. "trip_type": 1,
  69. "bag_amount": pc,
  70. "bag_weight": kg,
  71. "max_threshold": max_threshold,
  72. "segments": end_segments,
  73. "ret_segments": [],
  74. "task": task,
  75. }
  76. def sync_policy(payload):
  77. headers = {
  78. "Content-Type": "application/json",
  79. }
  80. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  81. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  82. resp_json = response.json()
  83. """
  84. {
  85. "code": 0,
  86. "msg": "ok",
  87. "data": {
  88. "deleted": 1,
  89. "created": 7
  90. }
  91. }
  92. """
  93. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  94. return resp_json
  95. def main():
  96. logger = XmiLogger("task")
  97. # 注意 \ufeff 是 UTF-8 的 BOM
  98. # 所以需要使用 utf-8-sig 编码
  99. task_list = []
  100. # 1 读取任务列表
  101. output_dir = "./keep"
  102. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  103. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  104. reader = csv.DictReader(f)
  105. for row in reader:
  106. task_list.append(row)
  107. # 2 任务列表逻辑处理(多线程)
  108. policy_list = []
  109. keep_info_end = []
  110. max_workers = 1 # 并发线程数,可按需要调整
  111. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  112. futures = {executor.submit(_process_one_task, task): task for task in task_list}
  113. total = len(futures)
  114. done = 0
  115. failed = 0
  116. for future in as_completed(futures):
  117. try:
  118. flight_data = future.result()
  119. if flight_data is not None:
  120. task = flight_data.pop("task")
  121. keep_info_end.append(task)
  122. policy_list.append(flight_data)
  123. except Exception as e:
  124. failed += 1
  125. task = futures[future]
  126. # print(f"任务异常 {task}: {e}")
  127. logger.error(f"任务异常 {task}: {e}")
  128. finally:
  129. done += 1
  130. logger.info(
  131. f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
  132. )
  133. # 3 批量一次性上传政策
  134. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  135. # logger.info(f"policy_list: {policy_list}")
  136. logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
  137. if len(policy_list) > 0:
  138. # 这里批量一次性上传政策
  139. payload = {"items": policy_list}
  140. sync_policy(payload)
  141. logger.info(f"keep_info_end: {len(keep_info_end)}")
  142. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  143. output_dir = "/home/node04/descending_cabin_files_uo"
  144. os.makedirs(output_dir, exist_ok=True)
  145. if keep_info_end:
  146. out_path = os.path.join(
  147. output_dir,
  148. f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
  149. )
  150. with open(out_path, "w", encoding="utf-8-sig") as f:
  151. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  152. writer.writeheader()
  153. for task in keep_info_end:
  154. writer.writerow(task)
  155. logger.info("keep_info_end 写入完成")
  156. else:
  157. logger.warning("keep_info_end 为空,跳过写入CSV")
  158. if __name__ == "__main__":
  159. main()