descending_cabin_task.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. """
  2. 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
  3. """
  4. import os
  5. import json
  6. import time
  7. import requests
  8. import threading
  9. from datetime import datetime
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. from xmi_logger import XmiLogger
  12. import csv
  13. # 询价\验价
  14. BASE_URL = "http://8.218.51.130:9000"
  15. CID = "cba37f642eab11ef8e7200163e01a06e"
  16. # 汇率
  17. RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
  18. RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
  19. # 政策
  20. # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
  21. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  22. VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间
  23. RETRY_TIMES = 3 # 请求询价/验价接口重试次数
  24. RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔
  25. RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间
  26. PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币
  27. RATES_RETRY_TIMES = 3 # 汇率接口重试次数
  28. RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔
  29. def time_format_conversion(match_time,
  30. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  31. out_strftime_str="%Y%m%d%H%M%S"):
  32. """
  33. 时间格式转换
  34. match_time 输入的时间 023-07-29T04:15:00
  35. in_strftime_str 输入的时间格式
  36. out_strftime_str 输出的时间格式
  37. """
  38. time_array = time.strptime(match_time, in_strftime_str)
  39. return time.strftime(out_strftime_str, time_array)
  40. class FlightPriceRequestError(Exception):
  41. """请求询价/验价接口时的网络或响应异常."""
  42. def __init__(self, message: str, cause: Exception | None = None):
  43. self.cause = cause
  44. super().__init__(message)
  45. def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
  46. """
  47. 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
  48. 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
  49. 带重试,失败返回 None。
  50. """
  51. last_err: Exception | None = None
  52. for attempt in range(RATES_RETRY_TIMES):
  53. try:
  54. resp = requests.get(
  55. rates_url,
  56. params={"base": base, "symbols": symbols},
  57. headers={
  58. "cid": cid,
  59. "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
  60. },
  61. timeout=15
  62. )
  63. resp.raise_for_status()
  64. body = resp.json()
  65. if body.get("code") != 0:
  66. last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
  67. if attempt < RATES_RETRY_TIMES - 1:
  68. time.sleep(RATES_RETRY_INTERVAL)
  69. continue
  70. data = body.get("data")
  71. if isinstance(data, dict) and "rate" in data:
  72. return float(data["rate"])
  73. last_err = ValueError("响应缺少 data.rate")
  74. except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
  75. last_err = e
  76. if attempt < RATES_RETRY_TIMES - 1:
  77. time.sleep(RATES_RETRY_INTERVAL)
  78. return None
  79. class FlightPriceClient:
  80. """请求封装:询价、验价接口."""
  81. def __init__(self, base_url: str = BASE_URL, cid: str = CID):
  82. self.base_url = base_url.rstrip("/")
  83. self.cid = cid
  84. self.headers = {
  85. "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
  86. "Content-Type": "application/json",
  87. }
  88. def _request(self, url: str, payload: dict) -> dict:
  89. """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
  90. last_err = None
  91. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  92. for attempt in range(RETRY_TIMES):
  93. try:
  94. if attempt > 0:
  95. time.sleep(RETRY_INTERVAL)
  96. resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
  97. resp.raise_for_status()
  98. body = resp.json()
  99. print(json.dumps(body, ensure_ascii=False)[:200])
  100. return body
  101. # print(resp.json())
  102. # return resp.json()
  103. except requests.Timeout as e:
  104. last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
  105. except requests.ConnectionError as e:
  106. last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
  107. except requests.HTTPError as e:
  108. last_err = FlightPriceRequestError(
  109. f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
  110. )
  111. except requests.RequestException as e:
  112. last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
  113. except json.JSONDecodeError as e:
  114. last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
  115. raise last_err
  116. def search_flights(
  117. self,
  118. from_city_code: str,
  119. to_city_code: str,
  120. from_day: str,
  121. ) -> dict:
  122. """询价接口."""
  123. url = f"{self.base_url}/v1/search_flights"
  124. payload = {
  125. "from_city_code": from_city_code,
  126. "to_city_code": to_city_code,
  127. "from_day": from_day,
  128. "cid": self.cid,
  129. }
  130. return self._request(url, payload)
  131. def verify_price(
  132. self,
  133. from_city_code: str,
  134. to_city_code: str,
  135. from_day: str,
  136. data: str,
  137. not_verify: bool = False,
  138. async_: bool = True,
  139. ) -> dict:
  140. """验价接口."""
  141. url = f"{self.base_url}/v1/verify_price"
  142. payload = {
  143. "from_city_code": from_city_code,
  144. "to_city_code": to_city_code,
  145. "from_day": from_day,
  146. "data": data,
  147. "cid": self.cid,
  148. "not_verify": not_verify,
  149. "async": async_,
  150. }
  151. return self._request(url, payload)
  152. class ResultMatcher:
  153. """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
  154. @staticmethod
  155. def match(
  156. result: list[dict],
  157. cabins: str,
  158. baggages: str,
  159. flight_numbers: str,
  160. ) -> dict | None:
  161. """
  162. 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
  163. 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
  164. 返回匹配到的那条 result 项(含 data),未匹配到返回 None。
  165. """
  166. separator = '|' # 更换分隔符由;为|
  167. cabin_list = [s.strip() for s in (cabins or "").split(separator)]
  168. baggage_list = [s.strip() for s in (baggages or "").split(separator)]
  169. flight_list = [s.strip() for s in (flight_numbers or "").split(separator)] if flight_numbers else []
  170. n = len(cabin_list)
  171. if n == 0 or len(baggage_list) != n:
  172. return None
  173. if flight_list and len(flight_list) != n:
  174. return None
  175. for item in result:
  176. segments = item.get("segments") or []
  177. if len(segments) != n:
  178. continue
  179. for i in range(n):
  180. seg = segments[i]
  181. if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
  182. break
  183. if flight_list and seg.get("flight_number") != flight_list[i]:
  184. break
  185. else:
  186. return item
  187. return None
  188. class VerifyResultHandler:
  189. """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
  190. @staticmethod
  191. def parse_time(ts: str | int | None) -> datetime | None:
  192. """解析时间字符串或时间戳."""
  193. if ts is None:
  194. return None
  195. if isinstance(ts, (int, float)):
  196. return datetime.fromtimestamp(ts)
  197. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
  198. try:
  199. return datetime.strptime(str(ts).strip(), fmt)
  200. except (ValueError, TypeError):
  201. continue
  202. return None
  203. @classmethod
  204. def is_within_3_minutes(cls, ts: str | int | None) -> bool:
  205. """判断给定时间是否在距今 3 分钟内."""
  206. dt = cls.parse_time(ts)
  207. if dt is None:
  208. return False
  209. return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
  210. @classmethod
  211. def get_valid_price(cls, single_result: dict) -> dict | None:
  212. """
  213. 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
  214. 否则看 verify_time 是否在 3 分钟内。
  215. 符合条件返回该条 dict,否则返回 None。
  216. """
  217. if not single_result:
  218. return None
  219. if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
  220. return single_result
  221. if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
  222. return single_result
  223. return None
  224. class FlightPriceTaskRunner:
  225. """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
  226. def __init__(self, client: FlightPriceClient | None = None, logger: XmiLogger | None = None):
  227. self.logger = logger or XmiLogger("flight_price_task")
  228. self.client = client or FlightPriceClient()
  229. self.matcher = ResultMatcher()
  230. self.handler = VerifyResultHandler()
  231. self.rate = fetch_rate("USD", "CNY")
  232. def run(
  233. self,
  234. task: dict,
  235. do_verify: bool = True,
  236. ) -> dict:
  237. """
  238. 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
  239. flight_number 可选,用于匹配。
  240. do_verify=False 时仅执行询价+匹配并返回 matched(不做验价)
  241. 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ..., "raw_search": ..., "matched": ...}
  242. """
  243. from_city_code = task["from_city_code"]
  244. to_city_code = task["to_city_code"]
  245. from_day = task["from_day"]
  246. flight_numbers = task.get("flight_numbers")
  247. cabins = task["cabins"]
  248. baggages = task["baggages"]
  249. # 1. 询价
  250. try:
  251. search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
  252. except FlightPriceRequestError as e:
  253. self.logger.warning(f"询价请求异常: {e}")
  254. return {"status": "request_error", "msg": str(e), "phase": "search"}
  255. if search_resp.get("code") != 0:
  256. return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
  257. result_list = search_resp.get("result") or []
  258. # print(result_list)
  259. matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
  260. # print(matched)
  261. if not matched:
  262. return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
  263. data = matched.get("data")
  264. if not data:
  265. return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
  266. # 只询价不验价走的流程
  267. if not do_verify:
  268. # return {"status": "ok", "raw_search": search_resp, "matched": matched, "data": data}
  269. expected_in_currency, rate_err = self._expected_price_in_verify_currency(task, matched)
  270. if rate_err:
  271. return {
  272. "status": "rate_error",
  273. "msg": rate_err,
  274. "raw_search": search_resp,
  275. "matched": matched,
  276. "data": data,
  277. }
  278. actual = matched.get("adult_total_price") # 询价和验价接口出来的币种已经是人民币, 不用再转换
  279. if self._price_within_threshold(expected_in_currency, actual): # 对比
  280. return {
  281. "status": "ok",
  282. "price_info": self._extract_price_info(matched),
  283. "raw_search": search_resp,
  284. "matched": matched,
  285. "data": data,
  286. }
  287. return {
  288. "status": "price_not_within_threshold",
  289. "msg": "询价结果价格不在阈值内",
  290. "expected": expected_in_currency,
  291. "actual": actual,
  292. "raw_search": search_resp,
  293. "matched": matched,
  294. "data": data,
  295. }
  296. # 2. 验价(先 not_verify=False)
  297. try:
  298. verify_resp = self.client.verify_price(
  299. from_city_code=from_city_code,
  300. to_city_code=to_city_code,
  301. from_day=from_day,
  302. data=data,
  303. not_verify=False,
  304. async_=True,
  305. )
  306. except FlightPriceRequestError as e:
  307. self.logger.warning(f"验价请求异常: {e}")
  308. return {"status": "request_error", "msg": str(e), "phase": "verify"}
  309. if verify_resp.get("code") != 0:
  310. return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
  311. # print(verify_resp)
  312. verify_result = verify_resp.get("result")
  313. if isinstance(verify_result, list) and len(verify_result) >= 1:
  314. single = verify_result[0]
  315. valid = self.handler.get_valid_price(single)
  316. if valid is not None:
  317. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  318. if rate_err:
  319. return {"status": "rate_error", "msg": rate_err}
  320. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  321. return {
  322. "status": "ok",
  323. "price_info": self._extract_price_info(valid),
  324. "raw_verify": verify_resp,
  325. }
  326. # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
  327. deadline = time.monotonic() + RETRY_DURATION
  328. while time.monotonic() < deadline:
  329. time.sleep(RETRY_INTERVAL)
  330. try:
  331. verify_resp = self.client.verify_price(
  332. from_city_code=from_city_code,
  333. to_city_code=to_city_code,
  334. from_day=from_day,
  335. data=data,
  336. not_verify=True,
  337. async_=True,
  338. )
  339. except FlightPriceRequestError as e:
  340. self.logger.warning(f"验价重试请求异常: {e}")
  341. continue
  342. if verify_resp.get("code") != 0:
  343. continue
  344. verify_result = verify_resp.get("result")
  345. if isinstance(verify_result, list) and len(verify_result) >= 1:
  346. single = verify_result[0]
  347. valid = self.handler.get_valid_price(single)
  348. if valid is not None:
  349. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  350. if rate_err:
  351. continue
  352. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  353. return {
  354. "status": "ok",
  355. "price_info": self._extract_price_info(valid),
  356. "raw_verify": verify_resp,
  357. }
  358. else:
  359. # 价格不符合,直接跳出循环,不用继续校验
  360. break
  361. # 3 分钟内都没有符合规则的数据,先占位
  362. return {
  363. "status": "placeholder",
  364. "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
  365. "task": task,
  366. "data": data,
  367. "raw_verify": verify_resp,
  368. }
  369. def _expected_price_in_verify_currency(
  370. self, task: dict, valid: dict
  371. ) -> tuple[float | None, str | None]:
  372. """
  373. 将任务期望价格换算为验价结果币种后的数值。
  374. 返回 (换算后的价格, None),若币种一致则直接换算为 float;
  375. 若需汇率且获取失败返回 (None, "汇率获取失败")。
  376. """
  377. try:
  378. expected_val = float(task.get("adult_total_price"))
  379. except (TypeError, ValueError):
  380. return None, "任务 adult_total_price 无效"
  381. if self.rate is None:
  382. task_currency = (task.get("currency") or "USD").strip().upper()
  383. verify_currency = (valid.get("currency") or "CNY").strip().upper()
  384. if task_currency == verify_currency:
  385. return expected_val, None
  386. rate = fetch_rate(task_currency, verify_currency)
  387. if rate is None:
  388. return None, "汇率获取失败"
  389. self.rate = rate
  390. return expected_val * self.rate, None
  391. @staticmethod
  392. def _price_within_threshold(
  393. expected: str | int | float | None,
  394. actual: str | int | float | None,
  395. threshold: float | None = None,
  396. ) -> bool:
  397. """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
  398. if threshold is None:
  399. threshold = PRICE_DIFF_THRESHOLD
  400. try:
  401. e = float(expected) if expected is not None else None
  402. a = float(actual) if actual is not None else None
  403. except (TypeError, ValueError):
  404. return False
  405. if e is None or a is None:
  406. return False
  407. return abs(a - e) <= threshold
  408. @staticmethod
  409. def _extract_price_info(item: dict) -> dict:
  410. """从单条 result 项提取价格相关信息."""
  411. return {
  412. "adult_price": item.get("adult_price"),
  413. "adult_tax": item.get("adult_tax"),
  414. "adult_total_price": item.get("adult_total_price"),
  415. "currency": item.get("currency"),
  416. "now_time": item.get("now_time"),
  417. "verify_time": item.get("verify_time"),
  418. }
  419. def _process_one_task(row, runner):
  420. """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
  421. task = row
  422. separator = '|' # 分隔符由;更换为|
  423. thread_name = threading.current_thread().name
  424. # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
  425. from_city_code, to_city_code = task["city_pair"].split("-")
  426. from_day = task["flight_day"].replace("-", "")
  427. flight_numbers = task["flight_number_1"].strip()
  428. if task["flight_number_2"].strip() != "VJ":
  429. flight_numbers += separator + task["flight_number_2"].strip()
  430. cabins = separator.join(["Y"] * len(flight_numbers.split(separator)))
  431. if str(task['baggage']) == '0':
  432. baggage_str = "-;-;-;-"
  433. else:
  434. baggage_str = f"1-{task['baggage']}"
  435. baggages = separator.join([baggage_str] * len(flight_numbers.split(separator)))
  436. end_task = {
  437. "from_city_code": from_city_code,
  438. "to_city_code": to_city_code,
  439. "from_day": from_day,
  440. "flight_numbers": flight_numbers,
  441. "cabins": cabins,
  442. "baggages": baggages,
  443. "adult_total_price": task.get("adult_total_price"),
  444. "currency": task.get("currency", "USD"),
  445. }
  446. # print("--------------------------------")
  447. # print(end_task)
  448. # print("--------------------------------")
  449. time.sleep(1)
  450. out = runner.run(end_task, do_verify=False) # 不验价,仅询价
  451. # print(json.dumps(out, ensure_ascii=False, indent=2))
  452. if out.get("status") != "ok":
  453. # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
  454. return None
  455. # print(f"价格: {out.get('price_info').get('adult_total_price')}")
  456. raw_verify = out.get("raw_verify")
  457. if raw_verify:
  458. results = raw_verify.get("result") or []
  459. else:
  460. matched = out.get("matched") or {}
  461. results = [matched] if matched else []
  462. if not results:
  463. return None
  464. print("raw_verify pass")
  465. # task 存放了 keep_info 的全部字段
  466. drop_price_change_upper = float(task.get("drop_price_change_upper")) # 降价的最小幅度
  467. drop_price_change_lower = float(task.get("drop_price_change_lower"))
  468. max_threshold = round(drop_price_change_upper * runner.rate * 0.5) # 降价阈值要按汇率转人民币(四舍五入到整数)
  469. result = results[0]
  470. # adult_price = result.get("adult_price")
  471. # adult_tax = result.get("adult_tax")
  472. # adult_total_price = result.get("adult_total_price")
  473. segments = result.get("segments") or []
  474. if not segments:
  475. return None
  476. end_segments = []
  477. baggage = segments[0].get("baggage")
  478. if baggage == "-;-;-;-":
  479. pc, kg = 0, 0 # 无行李的设置?
  480. else:
  481. pc, kg = [int(i) for i in baggage.split("-")]
  482. for seg in segments:
  483. flight_number = seg.get("flight_number")
  484. operating_flight_number = seg.get("operating_flight_number")
  485. if flight_number == operating_flight_number:
  486. operating_flight_number = ""
  487. dep_time = seg.get("dep_time")
  488. arr_time = seg.get("arr_time")
  489. dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  490. arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  491. end_segment = {
  492. "carrier": seg.get("carrier"),
  493. "flight_number": flight_number,
  494. # "dep_air_port": seg.get("dep_air_port"),
  495. # "arr_air_port": seg.get("arr_air_port"),
  496. "dep_city_code": seg.get("dep_city_code"),
  497. "arr_city_code": seg.get("arr_city_code"),
  498. # "operating_flight_number": operating_flight_number,
  499. "cabin": seg.get("cabin"),
  500. "dep_time": dep_time,
  501. # "arr_time": arr_time,
  502. }
  503. end_segments.append(end_segment)
  504. return {
  505. "trip_type": 1,
  506. # "cover_price": adult_price,
  507. # "cover_tax": adult_tax,
  508. "bag_amount": pc,
  509. "bag_weight": kg,
  510. "max_threshold": max_threshold,
  511. "segments": end_segments,
  512. "ret_segments": [],
  513. "task": task
  514. }
  515. def sync_policy(payload):
  516. headers = {
  517. "Content-Type": "application/json",
  518. }
  519. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  520. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  521. resp_json = response.json()
  522. """
  523. {
  524. "code": 0,
  525. "msg": "ok",
  526. "data": {
  527. "deleted": 1,
  528. "created": 7
  529. }
  530. }
  531. """
  532. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  533. return resp_json
  534. def main():
  535. logger = XmiLogger("task")
  536. # 注意 \ufeff 是 UTF-8 的 BOM
  537. # 所以需要使用 utf-8-sig 编码
  538. task_list = []
  539. runner = FlightPriceTaskRunner(logger=logger)
  540. # 1 读取任务列表
  541. output_dir = "./keep_0"
  542. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  543. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  544. reader = csv.DictReader(f)
  545. for row in reader:
  546. task_list.append(row)
  547. # 2 任务列表逻辑处理(多线程)
  548. policy_list = []
  549. keep_info_end = []
  550. max_workers = 5 # 并发线程数,可按需要调整
  551. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  552. futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
  553. total = len(futures)
  554. done = 0
  555. failed = 0
  556. for future in as_completed(futures):
  557. try:
  558. flight_data = future.result()
  559. if flight_data is not None:
  560. task = flight_data.pop("task")
  561. keep_info_end.append(task)
  562. policy_list.append(flight_data)
  563. except Exception as e:
  564. failed += 1
  565. task = futures[future]
  566. # print(f"任务异常 {task}: {e}")
  567. logger.error(f"任务异常 {task}: {e}")
  568. finally:
  569. done += 1
  570. logger.info(
  571. f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
  572. )
  573. # 3 批量一次性上传政策
  574. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  575. # logger.info(f"policy_list: {policy_list}")
  576. logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
  577. if len(policy_list) > 0:
  578. # 这里批量一次性上传政策
  579. payload = {"items": policy_list}
  580. sync_policy(payload)
  581. logger.info(f"keep_info_end: {len(keep_info_end)}")
  582. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  583. output_dir = "/home/node04/descending_cabin_files"
  584. os.makedirs(output_dir, exist_ok=True)
  585. if keep_info_end:
  586. out_path = os.path.join(
  587. output_dir,
  588. f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
  589. )
  590. with open(out_path, "w", encoding="utf-8-sig") as f:
  591. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  592. writer.writeheader()
  593. for task in keep_info_end:
  594. writer.writerow(task)
  595. logger.info("keep_info_end 写入完成")
  596. else:
  597. logger.warning("keep_info_end 为空,跳过写入CSV")
  598. if __name__ == "__main__":
  599. main()