descending_cabin_task.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. """
  2. 无询价 + 无验价任务:直接按航班号/舱位/行李匹配 result 取 data
  3. """
  4. import os
  5. import json
  6. import time
  7. import requests
  8. import threading
  9. import traceback
  10. import redis
  11. from datetime import datetime, timedelta
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. from xmi_logger import XmiLogger
  14. import csv
  15. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  16. def time_format_conversion(match_time,
  17. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  18. out_strftime_str="%Y%m%d%H%M%S"):
  19. """
  20. 时间格式转换
  21. match_time 输入的时间 023-07-29T04:15:00
  22. in_strftime_str 输入的时间格式
  23. out_strftime_str 输出的时间格式
  24. """
  25. time_array = time.strptime(match_time, in_strftime_str)
  26. return time.strftime(out_strftime_str, time_array)
  27. def _process_one_task(row):
  28. task = row
  29. price_base = round(float(task.get('price_base', '0.0')))
  30. price_tax = round(float(task.get('price_tax', '0.0')))
  31. city_pair = (task.get("citypair") or "").strip()
  32. if "-" not in city_pair:
  33. return None
  34. from_city_code, to_city_code = city_pair.split("-", 1)
  35. flight_numbers_raw = (
  36. task.get("flight_numbers")
  37. or ""
  38. ).strip()
  39. flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()]
  40. if not flight_numbers:
  41. return None
  42. cabins_raw = (task.get("cabins") or "").strip()
  43. cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else []
  44. if not cabin_list:
  45. cabin_list = ["Y"] * len(flight_numbers)
  46. if len(cabin_list) < len(flight_numbers):
  47. cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list)))
  48. baggage_val = str(task.get("baggage_weight") or "0").strip()
  49. try:
  50. kg = int(float(baggage_val))
  51. except (TypeError, ValueError):
  52. kg = 0
  53. pc = 0 if kg <= 0 else 1
  54. dep_time = (
  55. task.get("from_time")
  56. or f"{task.get('from_date') or ''} 00:00:00"
  57. )
  58. drop_price_change_upper = float(task.get("drop_price_change_upper") or 0) # 最小的降价幅度
  59. max_threshold = round(drop_price_change_upper * 1.0)
  60. max_threshold = max_threshold + 20
  61. if max_threshold > 0 or abs(max_threshold) < 10: # 丢弃小于10人民币的降价幅度
  62. return None
  63. drop_price_sample_size = int(task.get("drop_price_sample_size", "0"))
  64. if drop_price_sample_size < 1: # 丢弃历史降价样本数过少(小于1)的
  65. return None
  66. from_date = task.get("from_date")
  67. if from_date in ['2026-04-28', '2026-04-29', '2026-04-30', '2026-05-01', '2026-05-05']: # 丢弃特殊起飞日期的
  68. return None
  69. end_segments = []
  70. for idx, flight_number in enumerate(flight_numbers):
  71. carrier = "".join([c for c in flight_number if c.isalpha()])
  72. end_segments.append({
  73. "carrier": carrier,
  74. "flight_number": flight_number,
  75. "dep_city_code": from_city_code,
  76. "arr_city_code": to_city_code,
  77. "cabin": cabin_list[idx],
  78. "dep_time": dep_time,
  79. })
  80. return {
  81. "trip_type": 1,
  82. "cover_price": price_base,
  83. "cover_tax": price_tax,
  84. "bag_amount": pc,
  85. "bag_weight": kg,
  86. "max_threshold": max_threshold,
  87. "segments": end_segments,
  88. "ret_segments": [],
  89. "task": task,
  90. }
  91. def sync_policy(payload):
  92. headers = {
  93. "Content-Type": "application/json",
  94. }
  95. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  96. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  97. resp_json = response.json()
  98. """
  99. {
  100. "code": 0,
  101. "msg": "ok",
  102. "data": {
  103. "deleted": 1,
  104. "created": 7
  105. }
  106. }
  107. """
  108. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  109. return resp_json
  110. def time_handle():
  111. now_time = datetime.now()
  112. next_time = now_time + timedelta(hours=2)
  113. next_ts = int(next_time.timestamp())
  114. expire_at = next_time + timedelta(minutes=1)
  115. ttl_seconds = int((expire_at - now_time).total_seconds())
  116. if ttl_seconds <= 0:
  117. ttl_seconds = 1
  118. redis_client = redis.Redis(host='192.168.20.98', port=6379, db=0)
  119. lock_key = "uo_next_pred_time"
  120. redis_client.set(lock_key, next_ts, ex=ttl_seconds)
  121. def main():
  122. logger = XmiLogger("task")
  123. # 注意 \ufeff 是 UTF-8 的 BOM
  124. # 所以需要使用 utf-8-sig 编码
  125. task_list = []
  126. # 1 读取任务列表
  127. output_dir = "./keep"
  128. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  129. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  130. reader = csv.DictReader(f)
  131. for row in reader:
  132. task_list.append(row)
  133. # 2 任务列表逻辑处理(多线程)
  134. policy_list = []
  135. keep_info_end = []
  136. max_workers = 1 # 并发线程数,可按需要调整
  137. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  138. futures = {executor.submit(_process_one_task, task): task for task in task_list}
  139. total = len(futures)
  140. done = 0
  141. failed = 0
  142. for future in as_completed(futures):
  143. try:
  144. flight_data = future.result()
  145. if flight_data is not None:
  146. task = flight_data.pop("task")
  147. keep_info_end.append(task)
  148. policy_list.append(flight_data)
  149. except Exception as e:
  150. failed += 1
  151. task = futures[future]
  152. # print(f"任务异常 {task}: {e}")
  153. logger.error(f"任务异常 {task}: {e}")
  154. finally:
  155. done += 1
  156. logger.info(
  157. f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
  158. )
  159. # 3 批量一次性上传政策
  160. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  161. # logger.info(f"policy_list: {policy_list}")
  162. logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
  163. if len(policy_list) > 0:
  164. # 这里批量一次性上传政策
  165. payload = {"items": policy_list}
  166. max_attempts = 3
  167. retryable_exceptions = (
  168. requests.exceptions.ConnectionError,
  169. requests.exceptions.ConnectTimeout,
  170. )
  171. for attempt in range(1, max_attempts + 1):
  172. try:
  173. sync_policy(payload)
  174. logger.info(f"上传政策成功")
  175. break
  176. except retryable_exceptions as e:
  177. if attempt == max_attempts:
  178. logger.error(f"上传政策失败(已重试{max_attempts}次): {e}")
  179. # logger.error(f"{traceback.format_exc()}")
  180. else:
  181. wait_seconds = attempt
  182. logger.warning(
  183. f"上传政策连接异常,第{attempt}/{max_attempts}次失败: {e},{wait_seconds}s后重试"
  184. )
  185. time.sleep(wait_seconds)
  186. except Exception as e:
  187. logger.error(f"上传政策失败(非连接异常,不重试): {e}")
  188. logger.error(f"{traceback.format_exc()}")
  189. break
  190. logger.info(f"keep_info_end: {len(keep_info_end)}")
  191. try:
  192. time_handle()
  193. logger.info(f"存redis时间成功")
  194. except Exception as e:
  195. logger.error(f"存redis时间失败: {e}")
  196. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  197. output_dir = "/home/node04/descending_cabin_files_uo"
  198. os.makedirs(output_dir, exist_ok=True)
  199. if keep_info_end:
  200. out_path = os.path.join(
  201. output_dir,
  202. f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
  203. )
  204. with open(out_path, "w", encoding="utf-8-sig") as f:
  205. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  206. writer.writeheader()
  207. for task in keep_info_end:
  208. writer.writerow(task)
  209. logger.info("keep_info_end 写入完成")
  210. else:
  211. logger.warning("keep_info_end 为空,跳过写入CSV")
  212. if __name__ == "__main__":
  213. main()