descending_cabin_task.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. """
  2. 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
  3. """
  4. import os
  5. import json
  6. import time
  7. import requests
  8. import threading
  9. from datetime import datetime
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. from xmi_logger import XmiLogger
  12. import csv
  13. # 询价\验价
  14. BASE_URL = "http://8.218.51.130:9000"
  15. CID = "cba37f642eab11ef8e7200163e01a06e"
  16. # 汇率
  17. RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
  18. RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
  19. # 政策
  20. # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
  21. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  22. VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间
  23. RETRY_TIMES = 3 # 请求询价/验价接口重试次数
  24. RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔
  25. RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间
  26. PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币
  27. RATES_RETRY_TIMES = 3 # 汇率接口重试次数
  28. RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔
  29. def time_format_conversion(match_time,
  30. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  31. out_strftime_str="%Y%m%d%H%M%S"):
  32. """
  33. 时间格式转换
  34. match_time 输入的时间 023-07-29T04:15:00
  35. in_strftime_str 输入的时间格式
  36. out_strftime_str 输出的时间格式
  37. """
  38. time_array = time.strptime(match_time, in_strftime_str)
  39. return time.strftime(out_strftime_str, time_array)
  40. class FlightPriceRequestError(Exception):
  41. """请求询价/验价接口时的网络或响应异常."""
  42. def __init__(self, message: str, cause: Exception | None = None):
  43. self.cause = cause
  44. super().__init__(message)
  45. def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
  46. """
  47. 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
  48. 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
  49. 带重试,失败返回 None。
  50. """
  51. last_err: Exception | None = None
  52. for attempt in range(RATES_RETRY_TIMES):
  53. try:
  54. resp = requests.get(
  55. rates_url,
  56. params={"base": base, "symbols": symbols},
  57. headers={
  58. "cid": cid,
  59. "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
  60. },
  61. timeout=15
  62. )
  63. resp.raise_for_status()
  64. body = resp.json()
  65. if body.get("code") != 0:
  66. last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
  67. if attempt < RATES_RETRY_TIMES - 1:
  68. time.sleep(RATES_RETRY_INTERVAL)
  69. continue
  70. data = body.get("data")
  71. if isinstance(data, dict) and "rate" in data:
  72. return float(data["rate"])
  73. last_err = ValueError("响应缺少 data.rate")
  74. except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
  75. last_err = e
  76. if attempt < RATES_RETRY_TIMES - 1:
  77. time.sleep(RATES_RETRY_INTERVAL)
  78. return None
  79. class FlightPriceClient:
  80. """请求封装:询价、验价接口."""
  81. def __init__(self, base_url: str = BASE_URL, cid: str = CID):
  82. self.base_url = base_url.rstrip("/")
  83. self.cid = cid
  84. self.headers = {
  85. "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
  86. "Content-Type": "application/json",
  87. }
  88. def _request(self, url: str, payload: dict) -> dict:
  89. """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
  90. last_err = None
  91. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  92. for attempt in range(RETRY_TIMES):
  93. try:
  94. if attempt > 0:
  95. time.sleep(RETRY_INTERVAL)
  96. resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
  97. resp.raise_for_status()
  98. print(resp.json())
  99. return resp.json()
  100. except requests.Timeout as e:
  101. last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
  102. except requests.ConnectionError as e:
  103. last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
  104. except requests.HTTPError as e:
  105. last_err = FlightPriceRequestError(
  106. f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
  107. )
  108. except requests.RequestException as e:
  109. last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
  110. except json.JSONDecodeError as e:
  111. last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
  112. raise last_err
  113. def search_flights(
  114. self,
  115. from_city_code: str,
  116. to_city_code: str,
  117. from_day: str,
  118. ) -> dict:
  119. """询价接口."""
  120. url = f"{self.base_url}/v1/search_flights"
  121. payload = {
  122. "from_city_code": from_city_code,
  123. "to_city_code": to_city_code,
  124. "from_day": from_day,
  125. "cid": self.cid,
  126. }
  127. return self._request(url, payload)
  128. def verify_price(
  129. self,
  130. from_city_code: str,
  131. to_city_code: str,
  132. from_day: str,
  133. data: str,
  134. not_verify: bool = False,
  135. async_: bool = True,
  136. ) -> dict:
  137. """验价接口."""
  138. url = f"{self.base_url}/v1/verify_price"
  139. payload = {
  140. "from_city_code": from_city_code,
  141. "to_city_code": to_city_code,
  142. "from_day": from_day,
  143. "data": data,
  144. "cid": self.cid,
  145. "not_verify": not_verify,
  146. "async": async_,
  147. }
  148. return self._request(url, payload)
  149. class ResultMatcher:
  150. """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
  151. @staticmethod
  152. def match(
  153. result: list[dict],
  154. cabins: str,
  155. baggages: str,
  156. flight_numbers: str,
  157. ) -> dict | None:
  158. """
  159. 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
  160. 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
  161. 返回匹配到的那条 result 项(含 data),未匹配到返回 None。
  162. """
  163. cabin_list = [s.strip() for s in (cabins or "").split(";")]
  164. baggage_list = [s.strip() for s in (baggages or "").split(";")]
  165. flight_list = [s.strip() for s in (flight_numbers or "").split(";")] if flight_numbers else []
  166. n = len(cabin_list)
  167. if n == 0 or len(baggage_list) != n:
  168. return None
  169. if flight_list and len(flight_list) != n:
  170. return None
  171. for item in result:
  172. segments = item.get("segments") or []
  173. if len(segments) != n:
  174. continue
  175. for i in range(n):
  176. seg = segments[i]
  177. if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
  178. break
  179. if flight_list and seg.get("flight_number") != flight_list[i]:
  180. break
  181. else:
  182. return item
  183. return None
  184. class VerifyResultHandler:
  185. """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
  186. @staticmethod
  187. def parse_time(ts: str | int | None) -> datetime | None:
  188. """解析时间字符串或时间戳."""
  189. if ts is None:
  190. return None
  191. if isinstance(ts, (int, float)):
  192. return datetime.fromtimestamp(ts)
  193. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
  194. try:
  195. return datetime.strptime(str(ts).strip(), fmt)
  196. except (ValueError, TypeError):
  197. continue
  198. return None
  199. @classmethod
  200. def is_within_3_minutes(cls, ts: str | int | None) -> bool:
  201. """判断给定时间是否在距今 3 分钟内."""
  202. dt = cls.parse_time(ts)
  203. if dt is None:
  204. return False
  205. return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
  206. @classmethod
  207. def get_valid_price(cls, single_result: dict) -> dict | None:
  208. """
  209. 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
  210. 否则看 verify_time 是否在 3 分钟内。
  211. 符合条件返回该条 dict,否则返回 None。
  212. """
  213. if not single_result:
  214. return None
  215. if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
  216. return single_result
  217. if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
  218. return single_result
  219. return None
  220. class FlightPriceTaskRunner:
  221. """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
  222. def __init__(self, client: FlightPriceClient | None = None):
  223. self.logger = XmiLogger("flight_price_task")
  224. self.client = client or FlightPriceClient()
  225. self.matcher = ResultMatcher()
  226. self.handler = VerifyResultHandler()
  227. # self.rate = fetch_rate("USD", "CNY")
  228. def run(
  229. self,
  230. task: dict,
  231. ) -> dict:
  232. """
  233. 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
  234. flight_number 可选,用于匹配。
  235. 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ...}
  236. """
  237. from_city_code = task["from_city_code"]
  238. to_city_code = task["to_city_code"]
  239. from_day = task["from_day"]
  240. flight_numbers = task.get("flight_numbers")
  241. cabins = task["cabins"]
  242. baggages = task["baggages"]
  243. # 1. 询价
  244. try:
  245. search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
  246. except FlightPriceRequestError as e:
  247. self.logger.warning(f"询价请求异常: {e}")
  248. return {"status": "request_error", "msg": str(e), "phase": "search"}
  249. if search_resp.get("code") != 0:
  250. return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
  251. result_list = search_resp.get("result") or []
  252. # print(result_list)
  253. matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
  254. # print(matched)
  255. if not matched:
  256. return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
  257. data = matched.get("data")
  258. if not data:
  259. return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
  260. # 2. 验价(先 not_verify=False)
  261. try:
  262. verify_resp = self.client.verify_price(
  263. from_city_code=from_city_code,
  264. to_city_code=to_city_code,
  265. from_day=from_day,
  266. data=data,
  267. not_verify=False,
  268. async_=True,
  269. )
  270. except FlightPriceRequestError as e:
  271. self.logger.warning(f"验价请求异常: {e}")
  272. return {"status": "request_error", "msg": str(e), "phase": "verify"}
  273. if verify_resp.get("code") != 0:
  274. return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
  275. # print(verify_resp)
  276. verify_result = verify_resp.get("result")
  277. if isinstance(verify_result, list) and len(verify_result) >= 1:
  278. single = verify_result[0]
  279. valid = self.handler.get_valid_price(single)
  280. if valid is not None:
  281. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  282. if rate_err:
  283. return {"status": "rate_error", "msg": rate_err}
  284. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  285. return {
  286. "status": "ok",
  287. "price_info": self._extract_price_info(valid),
  288. "raw_verify": verify_resp,
  289. }
  290. # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
  291. deadline = time.monotonic() + RETRY_DURATION
  292. while time.monotonic() < deadline:
  293. time.sleep(RETRY_INTERVAL)
  294. try:
  295. verify_resp = self.client.verify_price(
  296. from_city_code=from_city_code,
  297. to_city_code=to_city_code,
  298. from_day=from_day,
  299. data=data,
  300. not_verify=True,
  301. async_=True,
  302. )
  303. except FlightPriceRequestError as e:
  304. self.logger.warning(f"验价重试请求异常: {e}")
  305. continue
  306. if verify_resp.get("code") != 0:
  307. continue
  308. verify_result = verify_resp.get("result")
  309. if isinstance(verify_result, list) and len(verify_result) >= 1:
  310. single = verify_result[0]
  311. valid = self.handler.get_valid_price(single)
  312. if valid is not None:
  313. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  314. if rate_err:
  315. continue
  316. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  317. return {
  318. "status": "ok",
  319. "price_info": self._extract_price_info(valid),
  320. "raw_verify": verify_resp,
  321. }
  322. else:
  323. # 价格不符合,直接跳出循环,不用继续校验
  324. break
  325. # 3 分钟内都没有符合规则的数据,先占位
  326. return {
  327. "status": "placeholder",
  328. "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
  329. "task": task,
  330. "data": data,
  331. "raw_verify": verify_resp,
  332. }
  333. def _expected_price_in_verify_currency(
  334. self, task: dict, valid: dict
  335. ) -> tuple[float | None, str | None]:
  336. """
  337. 将任务期望价格换算为验价结果币种后的数值。
  338. 返回 (换算后的价格, None),若币种一致则直接换算为 float;
  339. 若需汇率且获取失败返回 (None, "汇率获取失败")。
  340. """
  341. try:
  342. expected_val = float(task.get("adult_total_price"))
  343. except (TypeError, ValueError):
  344. return None, "任务 adult_total_price 无效"
  345. task_currency = (task.get("currency") or "USD").strip().upper()
  346. verify_currency = (valid.get("currency") or "CNY").strip().upper()
  347. if task_currency == verify_currency:
  348. return expected_val, None
  349. rate = fetch_rate(task_currency, verify_currency)
  350. if rate is None:
  351. return None, "汇率获取失败"
  352. return expected_val * rate, None
  353. @staticmethod
  354. def _price_within_threshold(
  355. expected: str | int | float | None,
  356. actual: str | int | float | None,
  357. threshold: float | None = None,
  358. ) -> bool:
  359. """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
  360. if threshold is None:
  361. threshold = PRICE_DIFF_THRESHOLD
  362. try:
  363. e = float(expected) if expected is not None else None
  364. a = float(actual) if actual is not None else None
  365. except (TypeError, ValueError):
  366. return False
  367. if e is None or a is None:
  368. return False
  369. return abs(a - e) <= threshold
  370. @staticmethod
  371. def _extract_price_info(item: dict) -> dict:
  372. """从单条 result 项提取价格相关信息."""
  373. return {
  374. "adult_price": item.get("adult_price"),
  375. "adult_tax": item.get("adult_tax"),
  376. "adult_total_price": item.get("adult_total_price"),
  377. "currency": item.get("currency"),
  378. "now_time": item.get("now_time"),
  379. "verify_time": item.get("verify_time"),
  380. }
  381. def _process_one_task(row, runner):
  382. """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
  383. task = row
  384. thread_name = threading.current_thread().name
  385. # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
  386. from_city_code, to_city_code = task["city_pair"].split("-")
  387. from_day = task["flight_day"].replace("-", "")
  388. flight_numbers = task["flight_number_1"].strip()
  389. if task["flight_number_2"].strip() != "VJ":
  390. flight_numbers += ";" + task["flight_number_2"].strip()
  391. cabins = ";".join(["Y"] * len(flight_numbers.split(";")))
  392. baggages = ";".join([f"1-{task['baggage']}"] * len(flight_numbers.split(";")))
  393. end_task = {
  394. "from_city_code": from_city_code,
  395. "to_city_code": to_city_code,
  396. "from_day": from_day,
  397. "flight_numbers": flight_numbers,
  398. "cabins": cabins,
  399. "baggages": baggages,
  400. "adult_total_price": task.get("adult_total_price"),
  401. "currency": task.get("currency", "USD"),
  402. }
  403. # print("--------------------------------")
  404. # print(end_task)
  405. # print("--------------------------------")
  406. out = runner.run(end_task)
  407. # print(json.dumps(out, ensure_ascii=False, indent=2))
  408. if out.get("status") != "ok":
  409. # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
  410. return None
  411. # print(f"价格: {out.get('price_info').get('adult_total_price')}")
  412. raw_verify = out.get("raw_verify")
  413. results = raw_verify.get("result")
  414. if not results:
  415. return None
  416. result = results[0]
  417. segments = result.get("segments")
  418. end_segments = []
  419. baggage = segments[0].get("baggage")
  420. pc, kg = [int(i) for i in baggage.split("-")]
  421. for seg in segments:
  422. flight_number = seg.get("flight_number")
  423. operating_flight_number = seg.get("operating_flight_number")
  424. if flight_number == operating_flight_number:
  425. operating_flight_number = ""
  426. dep_time = seg.get("dep_time")
  427. arr_time = seg.get("arr_time")
  428. dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  429. arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  430. end_segment = {
  431. "carrier": seg.get("carrier"),
  432. "dep_air_port": seg.get("dep_air_port"),
  433. "arr_air_port": seg.get("arr_air_port"),
  434. "dep_city_code": seg.get("dep_city_code"),
  435. "arr_city_code": seg.get("arr_city_code"),
  436. "flight_number": flight_number,
  437. "operating_flight_number": operating_flight_number,
  438. "cabin": seg.get("cabin"),
  439. "dep_time": dep_time,
  440. "arr_time": arr_time,
  441. }
  442. end_segments.append(end_segment)
  443. return {
  444. "trip_type": 1,
  445. "segments": end_segments,
  446. "price_add": 0,
  447. "bag_amount": pc,
  448. "bag_weight": kg,
  449. "task": task
  450. }
  451. def sync_policy(payload):
  452. headers = {
  453. "Content-Type": "application/json",
  454. }
  455. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  456. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  457. resp_json = response.json()
  458. """
  459. {
  460. "code": 0,
  461. "msg": "ok",
  462. "data": {
  463. "deleted": 1,
  464. "created": 7
  465. }
  466. }
  467. """
  468. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  469. return resp_json
  470. def main():
  471. logger = XmiLogger("task")
  472. # 注意 \ufeff 是 UTF-8 的 BOM
  473. # 所以需要使用 utf-8-sig 编码
  474. task_list = []
  475. runner = FlightPriceTaskRunner()
  476. # 1 读取任务列表
  477. output_dir = "./keep_0"
  478. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  479. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  480. reader = csv.DictReader(f)
  481. for row in reader:
  482. task_list.append(row)
  483. # 2 任务列表逻辑处理(多线程)
  484. policy_list = []
  485. keep_info_end = []
  486. max_workers = 3 # 并发线程数,可按需要调整
  487. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  488. futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
  489. for future in as_completed(futures):
  490. try:
  491. flight_data = future.result()
  492. if flight_data is not None:
  493. task = flight_data.pop("task")
  494. keep_info_end.append(task)
  495. policy_list.append(flight_data)
  496. except Exception as e:
  497. task = futures[future]
  498. print(f"任务异常 {task}: {e}")
  499. # 3 批量一次性上传政策
  500. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  501. logger.info(f"policy_list: {policy_list}")
  502. if len(policy_list) > 0:
  503. # 这里批量一次性上传政策
  504. payload = {"items": policy_list}
  505. sync_policy(payload)
  506. logger.info(f"keep_info_end: {len(keep_info_end)}")
  507. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  508. # if not os.path.exists("/home/node04/descending_cabin_files"):
  509. # os.makedirs("/home/node04/descending_cabin_files")
  510. with open(f"/home/node04/descending_cabin_files/keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", "w", encoding="utf-8-sig") as f:
  511. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  512. writer.writeheader()
  513. for task in keep_info_end:
  514. writer.writerow(task)
  515. logger.info(f"keep_info_end 写入完成")
  516. if __name__ == "__main__":
  517. main()