descending_cabin_task.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. """
  2. 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
  3. """
  4. import os
  5. import json
  6. import time
  7. import traceback
  8. import requests
  9. import threading
  10. import redis
  11. from datetime import datetime, timedelta
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. from xmi_logger import XmiLogger
  14. import csv
  15. # 询价\验价
  16. BASE_URL = "http://8.218.51.130:9000"
  17. CID = "cba37f642eab11ef8e7200163e01a06e"
  18. # 汇率
  19. RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
  20. RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
  21. # 政策
  22. # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
  23. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  24. VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间
  25. RETRY_TIMES = 3 # 请求询价/验价接口重试次数
  26. RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔
  27. RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间
  28. PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币
  29. RATES_RETRY_TIMES = 3 # 汇率接口重试次数
  30. RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔
  31. def time_format_conversion(match_time,
  32. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  33. out_strftime_str="%Y%m%d%H%M%S"):
  34. """
  35. 时间格式转换
  36. match_time 输入的时间 023-07-29T04:15:00
  37. in_strftime_str 输入的时间格式
  38. out_strftime_str 输出的时间格式
  39. """
  40. time_array = time.strptime(match_time, in_strftime_str)
  41. return time.strftime(out_strftime_str, time_array)
  42. class FlightPriceRequestError(Exception):
  43. """请求询价/验价接口时的网络或响应异常."""
  44. def __init__(self, message: str, cause: Exception | None = None):
  45. self.cause = cause
  46. super().__init__(message)
  47. def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
  48. """
  49. 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
  50. 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
  51. 带重试,失败返回 None。
  52. """
  53. last_err: Exception | None = None
  54. for attempt in range(RATES_RETRY_TIMES):
  55. try:
  56. resp = requests.get(
  57. rates_url,
  58. params={"base": base, "symbols": symbols},
  59. headers={
  60. "cid": cid,
  61. "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
  62. },
  63. timeout=15
  64. )
  65. resp.raise_for_status()
  66. body = resp.json()
  67. if body.get("code") != 0:
  68. last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
  69. if attempt < RATES_RETRY_TIMES - 1:
  70. time.sleep(RATES_RETRY_INTERVAL)
  71. continue
  72. data = body.get("data")
  73. if isinstance(data, dict) and "rate" in data:
  74. return float(data["rate"])
  75. last_err = ValueError("响应缺少 data.rate")
  76. except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
  77. last_err = e
  78. if attempt < RATES_RETRY_TIMES - 1:
  79. time.sleep(RATES_RETRY_INTERVAL)
  80. return None
  81. class FlightPriceClient:
  82. """请求封装:询价、验价接口."""
  83. def __init__(self, base_url: str = BASE_URL, cid: str = CID):
  84. self.base_url = base_url.rstrip("/")
  85. self.cid = cid
  86. self.headers = {
  87. "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
  88. "Content-Type": "application/json",
  89. }
  90. def _request(self, url: str, payload: dict) -> dict:
  91. """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
  92. last_err = None
  93. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  94. for attempt in range(RETRY_TIMES):
  95. try:
  96. if attempt > 0:
  97. time.sleep(RETRY_INTERVAL)
  98. resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
  99. resp.raise_for_status()
  100. body = resp.json()
  101. print(json.dumps(body, ensure_ascii=False)[:200])
  102. return body
  103. # print(resp.json())
  104. # return resp.json()
  105. except requests.Timeout as e:
  106. last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
  107. except requests.ConnectionError as e:
  108. last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
  109. except requests.HTTPError as e:
  110. last_err = FlightPriceRequestError(
  111. f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
  112. )
  113. except requests.RequestException as e:
  114. last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
  115. except json.JSONDecodeError as e:
  116. last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
  117. raise last_err
  118. def search_flights(
  119. self,
  120. from_city_code: str,
  121. to_city_code: str,
  122. from_day: str,
  123. ) -> dict:
  124. """询价接口."""
  125. url = f"{self.base_url}/v1/search_flights"
  126. payload = {
  127. "from_city_code": from_city_code,
  128. "to_city_code": to_city_code,
  129. "from_day": from_day,
  130. "cid": self.cid,
  131. }
  132. return self._request(url, payload)
  133. def verify_price(
  134. self,
  135. from_city_code: str,
  136. to_city_code: str,
  137. from_day: str,
  138. data: str,
  139. not_verify: bool = False,
  140. async_: bool = True,
  141. ) -> dict:
  142. """验价接口."""
  143. url = f"{self.base_url}/v1/verify_price"
  144. payload = {
  145. "from_city_code": from_city_code,
  146. "to_city_code": to_city_code,
  147. "from_day": from_day,
  148. "data": data,
  149. "cid": self.cid,
  150. "not_verify": not_verify,
  151. "async": async_,
  152. }
  153. return self._request(url, payload)
  154. class ResultMatcher:
  155. """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
  156. @staticmethod
  157. def match(
  158. result: list[dict],
  159. cabins: str,
  160. baggages: str,
  161. flight_numbers: str,
  162. ) -> dict | None:
  163. """
  164. 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
  165. 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
  166. 返回匹配到的那条 result 项(含 data),未匹配到返回 None。
  167. """
  168. separator = '|' # 更换分隔符由;为|
  169. cabin_list = [s.strip() for s in (cabins or "").split(separator)]
  170. baggage_list = [s.strip() for s in (baggages or "").split(separator)]
  171. flight_list = [s.strip() for s in (flight_numbers or "").split(separator)] if flight_numbers else []
  172. n = len(cabin_list)
  173. if n == 0 or len(baggage_list) != n:
  174. return None
  175. if flight_list and len(flight_list) != n:
  176. return None
  177. for item in result:
  178. segments = item.get("segments") or []
  179. if len(segments) != n:
  180. continue
  181. for i in range(n):
  182. seg = segments[i]
  183. if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
  184. break
  185. if flight_list and seg.get("flight_number") != flight_list[i]:
  186. break
  187. else:
  188. return item
  189. return None
  190. class VerifyResultHandler:
  191. """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
  192. @staticmethod
  193. def parse_time(ts: str | int | None) -> datetime | None:
  194. """解析时间字符串或时间戳."""
  195. if ts is None:
  196. return None
  197. if isinstance(ts, (int, float)):
  198. return datetime.fromtimestamp(ts)
  199. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
  200. try:
  201. return datetime.strptime(str(ts).strip(), fmt)
  202. except (ValueError, TypeError):
  203. continue
  204. return None
  205. @classmethod
  206. def is_within_3_minutes(cls, ts: str | int | None) -> bool:
  207. """判断给定时间是否在距今 3 分钟内."""
  208. dt = cls.parse_time(ts)
  209. if dt is None:
  210. return False
  211. return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
  212. @classmethod
  213. def get_valid_price(cls, single_result: dict) -> dict | None:
  214. """
  215. 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
  216. 否则看 verify_time 是否在 3 分钟内。
  217. 符合条件返回该条 dict,否则返回 None。
  218. """
  219. if not single_result:
  220. return None
  221. if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
  222. return single_result
  223. if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
  224. return single_result
  225. return None
  226. class FlightPriceTaskRunner:
  227. """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
  228. def __init__(self, client: FlightPriceClient | None = None, logger: XmiLogger | None = None):
  229. self.logger = logger or XmiLogger("flight_price_task")
  230. self.client = client or FlightPriceClient()
  231. self.matcher = ResultMatcher()
  232. self.handler = VerifyResultHandler()
  233. self.rate = fetch_rate("USD", "CNY")
  234. def run(
  235. self,
  236. task: dict,
  237. do_verify: bool = True,
  238. ) -> dict:
  239. """
  240. 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
  241. flight_number 可选,用于匹配。
  242. do_verify=False 时仅执行询价+匹配并返回 matched(不做验价)
  243. 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ..., "raw_search": ..., "matched": ...}
  244. """
  245. from_city_code = task["from_city_code"]
  246. to_city_code = task["to_city_code"]
  247. from_day = task["from_day"]
  248. flight_numbers = task.get("flight_numbers")
  249. cabins = task["cabins"]
  250. baggages = task["baggages"]
  251. # 1. 询价
  252. try:
  253. search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
  254. except FlightPriceRequestError as e:
  255. self.logger.warning(f"询价请求异常: {e}")
  256. return {"status": "request_error", "msg": str(e), "phase": "search"}
  257. if search_resp.get("code") != 0:
  258. return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
  259. result_list = search_resp.get("result") or []
  260. # print(result_list)
  261. matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
  262. # print(matched)
  263. if not matched:
  264. return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
  265. data = matched.get("data")
  266. if not data:
  267. return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
  268. # 只询价不验价走的流程
  269. if not do_verify:
  270. # return {"status": "ok", "raw_search": search_resp, "matched": matched, "data": data}
  271. expected_in_currency, rate_err = self._expected_price_in_verify_currency(task, matched)
  272. if rate_err:
  273. return {
  274. "status": "rate_error",
  275. "msg": rate_err,
  276. "raw_search": search_resp,
  277. "matched": matched,
  278. "data": data,
  279. }
  280. actual = matched.get("adult_total_price") # 询价和验价接口出来的币种已经是人民币, 不用再转换
  281. if self._price_within_threshold(expected_in_currency, actual): # 对比
  282. return {
  283. "status": "ok",
  284. "price_info": self._extract_price_info(matched),
  285. "raw_search": search_resp,
  286. "matched": matched,
  287. "data": data,
  288. }
  289. return {
  290. "status": "price_not_within_threshold",
  291. "msg": "询价结果价格不在阈值内",
  292. "expected": expected_in_currency,
  293. "actual": actual,
  294. "raw_search": search_resp,
  295. "matched": matched,
  296. "data": data,
  297. }
  298. # 2. 验价(先 not_verify=False)
  299. try:
  300. verify_resp = self.client.verify_price(
  301. from_city_code=from_city_code,
  302. to_city_code=to_city_code,
  303. from_day=from_day,
  304. data=data,
  305. not_verify=False,
  306. async_=True,
  307. )
  308. except FlightPriceRequestError as e:
  309. self.logger.warning(f"验价请求异常: {e}")
  310. return {"status": "request_error", "msg": str(e), "phase": "verify"}
  311. if verify_resp.get("code") != 0:
  312. return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
  313. # print(verify_resp)
  314. verify_result = verify_resp.get("result")
  315. if isinstance(verify_result, list) and len(verify_result) >= 1:
  316. single = verify_result[0]
  317. valid = self.handler.get_valid_price(single)
  318. if valid is not None:
  319. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  320. if rate_err:
  321. return {"status": "rate_error", "msg": rate_err}
  322. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  323. return {
  324. "status": "ok",
  325. "price_info": self._extract_price_info(valid),
  326. "raw_verify": verify_resp,
  327. }
  328. # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
  329. deadline = time.monotonic() + RETRY_DURATION
  330. while time.monotonic() < deadline:
  331. time.sleep(RETRY_INTERVAL)
  332. try:
  333. verify_resp = self.client.verify_price(
  334. from_city_code=from_city_code,
  335. to_city_code=to_city_code,
  336. from_day=from_day,
  337. data=data,
  338. not_verify=True,
  339. async_=True,
  340. )
  341. except FlightPriceRequestError as e:
  342. self.logger.warning(f"验价重试请求异常: {e}")
  343. continue
  344. if verify_resp.get("code") != 0:
  345. continue
  346. verify_result = verify_resp.get("result")
  347. if isinstance(verify_result, list) and len(verify_result) >= 1:
  348. single = verify_result[0]
  349. valid = self.handler.get_valid_price(single)
  350. if valid is not None:
  351. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  352. if rate_err:
  353. continue
  354. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  355. return {
  356. "status": "ok",
  357. "price_info": self._extract_price_info(valid),
  358. "raw_verify": verify_resp,
  359. }
  360. else:
  361. # 价格不符合,直接跳出循环,不用继续校验
  362. break
  363. # 3 分钟内都没有符合规则的数据,先占位
  364. return {
  365. "status": "placeholder",
  366. "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
  367. "task": task,
  368. "data": data,
  369. "raw_verify": verify_resp,
  370. }
  371. def _expected_price_in_verify_currency(
  372. self, task: dict, valid: dict
  373. ) -> tuple[float | None, str | None]:
  374. """
  375. 将任务期望价格换算为验价结果币种后的数值。
  376. 返回 (换算后的价格, None),若币种一致则直接换算为 float;
  377. 若需汇率且获取失败返回 (None, "汇率获取失败")。
  378. """
  379. try:
  380. expected_val = float(task.get("adult_total_price"))
  381. except (TypeError, ValueError):
  382. return None, "任务 adult_total_price 无效"
  383. if self.rate is None:
  384. task_currency = (task.get("currency") or "USD").strip().upper()
  385. verify_currency = (valid.get("currency") or "CNY").strip().upper()
  386. if task_currency == verify_currency:
  387. return expected_val, None
  388. rate = fetch_rate(task_currency, verify_currency)
  389. if rate is None:
  390. return None, "汇率获取失败"
  391. self.rate = rate
  392. return expected_val * self.rate, None
  393. @staticmethod
  394. def _price_within_threshold(
  395. expected: str | int | float | None,
  396. actual: str | int | float | None,
  397. threshold: float | None = None,
  398. ) -> bool:
  399. """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
  400. if threshold is None:
  401. threshold = PRICE_DIFF_THRESHOLD
  402. try:
  403. e = float(expected) if expected is not None else None
  404. a = float(actual) if actual is not None else None
  405. except (TypeError, ValueError):
  406. return False
  407. if e is None or a is None:
  408. return False
  409. return abs(a - e) <= threshold
  410. @staticmethod
  411. def _extract_price_info(item: dict) -> dict:
  412. """从单条 result 项提取价格相关信息."""
  413. return {
  414. "adult_price": item.get("adult_price"),
  415. "adult_tax": item.get("adult_tax"),
  416. "adult_total_price": item.get("adult_total_price"),
  417. "currency": item.get("currency"),
  418. "now_time": item.get("now_time"),
  419. "verify_time": item.get("verify_time"),
  420. }
  421. def _process_one_task(row, runner):
  422. """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
  423. task = row
  424. separator = '|' # 分隔符由;更换为|
  425. thread_name = threading.current_thread().name
  426. # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
  427. from_city_code, to_city_code = task["city_pair"].split("-")
  428. from_day = task["flight_day"].replace("-", "")
  429. flight_numbers = task["flight_number_1"].strip()
  430. if task["flight_number_2"].strip() != "VJ":
  431. flight_numbers += separator + task["flight_number_2"].strip()
  432. cabins = separator.join(["Y"] * len(flight_numbers.split(separator)))
  433. if str(task['baggage']) == '0':
  434. baggage_str = "-;-;-;-"
  435. else:
  436. baggage_str = f"1-{task['baggage']}"
  437. baggages = separator.join([baggage_str] * len(flight_numbers.split(separator)))
  438. end_task = {
  439. "from_city_code": from_city_code,
  440. "to_city_code": to_city_code,
  441. "from_day": from_day,
  442. "flight_numbers": flight_numbers,
  443. "cabins": cabins,
  444. "baggages": baggages,
  445. "adult_total_price": task.get("adult_total_price"),
  446. "currency": task.get("currency", "USD"),
  447. }
  448. # print("--------------------------------")
  449. # print(end_task)
  450. # print("--------------------------------")
  451. # 在询价之前检查条件,task 存放了 keep_info 的全部字段
  452. drop_price_change_upper = float(task.get("drop_price_change_upper")) # 降价的最小幅度
  453. drop_price_change_lower = float(task.get("drop_price_change_lower")) # 降价的最大幅度
  454. # if abs(drop_price_change_upper) > 200: # 丢弃超过200美元的降价幅度
  455. # return None
  456. max_threshold = round(drop_price_change_upper * runner.rate * 1.0) # 降价阈值要按汇率转人民币(四舍五入到整数)
  457. max_threshold = max_threshold + 20
  458. if max_threshold > 0 or abs(max_threshold) < 10: # 丢弃小于10人民币的降价幅度
  459. return None
  460. drop_price_sample_size = int(task.get("drop_price_sample_size", "0"))
  461. if drop_price_sample_size < 3: # 丢弃历史降价样本数过少(小于3)的
  462. return None
  463. flight_day = task.get("flight_day")
  464. if flight_day in ['2026-04-28', '2026-04-29', '2026-04-30', '2026-05-01', '2026-05-05']: # 丢弃特殊起飞日期的
  465. return None
  466. time.sleep(1)
  467. out = runner.run(end_task, do_verify=False) # 不验价,仅询价
  468. # print(json.dumps(out, ensure_ascii=False, indent=2))
  469. if out.get("status") != "ok":
  470. # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
  471. return None
  472. # print(f"价格: {out.get('price_info').get('adult_total_price')}")
  473. raw_verify = out.get("raw_verify")
  474. if raw_verify:
  475. results = raw_verify.get("result") or []
  476. else:
  477. matched = out.get("matched") or {}
  478. results = [matched] if matched else []
  479. if not results:
  480. return None
  481. print("raw_verify pass")
  482. result = results[0]
  483. adult_price = result.get("adult_price")
  484. adult_tax = result.get("adult_tax")
  485. cover_price = round(adult_price * runner.rate)
  486. cover_tax = round(adult_tax * runner.rate)
  487. # adult_total_price = result.get("adult_total_price")
  488. segments = result.get("segments") or []
  489. if not segments:
  490. return None
  491. end_segments = []
  492. baggage = segments[0].get("baggage")
  493. if baggage == "-;-;-;-":
  494. pc, kg = 0, 0 # 无行李的设置?
  495. else:
  496. pc, kg = [int(i) for i in baggage.split("-")]
  497. for seg in segments:
  498. flight_number = seg.get("flight_number")
  499. operating_flight_number = seg.get("operating_flight_number")
  500. if flight_number == operating_flight_number:
  501. operating_flight_number = ""
  502. dep_time = seg.get("dep_time")
  503. arr_time = seg.get("arr_time")
  504. dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  505. arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  506. end_segment = {
  507. "carrier": seg.get("carrier"),
  508. "flight_number": flight_number,
  509. # "dep_air_port": seg.get("dep_air_port"),
  510. # "arr_air_port": seg.get("arr_air_port"),
  511. "dep_city_code": seg.get("dep_city_code"),
  512. "arr_city_code": seg.get("arr_city_code"),
  513. # "operating_flight_number": operating_flight_number,
  514. "cabin": seg.get("cabin"),
  515. "dep_time": dep_time,
  516. # "arr_time": arr_time,
  517. }
  518. end_segments.append(end_segment)
  519. return {
  520. "trip_type": 1,
  521. "cover_price": cover_price,
  522. "cover_tax": cover_tax,
  523. "bag_amount": pc,
  524. "bag_weight": kg,
  525. "max_threshold": max_threshold,
  526. "segments": end_segments,
  527. "ret_segments": [],
  528. "task": task
  529. }
  530. def sync_policy(payload):
  531. headers = {
  532. "Content-Type": "application/json",
  533. }
  534. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  535. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  536. # print(response.text[:1000])
  537. resp_json = response.json()
  538. """
  539. {
  540. "code": 0,
  541. "msg": "ok",
  542. "data": {
  543. "deleted": 1,
  544. "created": 7
  545. }
  546. }
  547. """
  548. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  549. return resp_json
  550. def time_handle():
  551. now_time = datetime.now()
  552. next_time = now_time + timedelta(hours=1)
  553. next_ts = int(next_time.timestamp())
  554. expire_at = next_time + timedelta(minutes=1)
  555. ttl_seconds = int((expire_at - now_time).total_seconds())
  556. if ttl_seconds <= 0:
  557. ttl_seconds = 1
  558. redis_client = redis.Redis(host='192.168.20.98', port=6379, db=0)
  559. lock_key = "vj_next_pred_time"
  560. redis_client.set(lock_key, next_ts, ex=ttl_seconds)
  561. def main():
  562. logger = XmiLogger("task")
  563. # 注意 \ufeff 是 UTF-8 的 BOM
  564. # 所以需要使用 utf-8-sig 编码
  565. task_list = []
  566. runner = FlightPriceTaskRunner(logger=logger)
  567. # 1 读取任务列表
  568. output_dir = "./keep_0"
  569. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  570. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  571. reader = csv.DictReader(f)
  572. for row in reader:
  573. task_list.append(row)
  574. # 2 任务列表逻辑处理(多线程)
  575. policy_list = []
  576. keep_info_end = []
  577. max_workers = 5 # 并发线程数,可按需要调整
  578. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  579. futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
  580. total = len(futures)
  581. done = 0
  582. failed = 0
  583. for future in as_completed(futures):
  584. try:
  585. flight_data = future.result()
  586. if flight_data is not None:
  587. task = flight_data.pop("task")
  588. keep_info_end.append(task)
  589. policy_list.append(flight_data)
  590. except Exception as e:
  591. failed += 1
  592. task = futures[future]
  593. # print(f"任务异常 {task}: {e}")
  594. logger.error(f"任务异常 {task}: {e}")
  595. finally:
  596. done += 1
  597. logger.info(
  598. f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
  599. )
  600. # 3 批量一次性上传政策
  601. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  602. # logger.info(f"policy_list: {policy_list}")
  603. logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
  604. if len(policy_list) > 0:
  605. # 这里批量一次性上传政策
  606. payload = {"items": policy_list}
  607. max_attempts = 3
  608. retryable_exceptions = (
  609. requests.exceptions.ConnectionError,
  610. requests.exceptions.ConnectTimeout,
  611. )
  612. for attempt in range(1, max_attempts + 1):
  613. try:
  614. sync_policy(payload)
  615. logger.info(f"上传政策成功")
  616. break
  617. except retryable_exceptions as e:
  618. if attempt == max_attempts:
  619. logger.error(f"上传政策失败(已重试{max_attempts}次): {e}")
  620. # logger.error(f"{traceback.format_exc()}")
  621. else:
  622. wait_seconds = attempt
  623. logger.warning(
  624. f"上传政策连接异常,第{attempt}/{max_attempts}次失败: {e},{wait_seconds}s后重试"
  625. )
  626. time.sleep(wait_seconds)
  627. except Exception as e:
  628. logger.error(f"上传政策失败(非连接异常,不重试): {e}")
  629. logger.error(f"{traceback.format_exc()}")
  630. break
  631. logger.info(f"keep_info_end: {len(keep_info_end)}")
  632. try:
  633. time_handle()
  634. logger.info(f"存redis时间成功")
  635. except Exception as e:
  636. logger.error(f"存redis时间失败: {e}")
  637. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  638. output_dir = "/home/node04/descending_cabin_files_vj"
  639. os.makedirs(output_dir, exist_ok=True)
  640. if keep_info_end:
  641. out_path = os.path.join(
  642. output_dir,
  643. f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
  644. )
  645. with open(out_path, "w", encoding="utf-8-sig") as f:
  646. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  647. writer.writeheader()
  648. for task in keep_info_end:
  649. writer.writerow(task)
  650. logger.info("keep_info_end 写入完成")
  651. else:
  652. logger.warning("keep_info_end 为空,跳过写入CSV")
  653. if __name__ == "__main__":
  654. main()