descending_cabin_task.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. """
  2. 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
  3. """
  4. import os
  5. import json
  6. import time
  7. import traceback
  8. import requests
  9. import threading
  10. from datetime import datetime
  11. from concurrent.futures import ThreadPoolExecutor, as_completed
  12. from xmi_logger import XmiLogger
  13. import csv
  14. # 询价\验价
  15. BASE_URL = "http://8.218.51.130:9000"
  16. CID = "cba37f642eab11ef8e7200163e01a06e"
  17. # 汇率
  18. RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
  19. RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
  20. # 政策
  21. # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
  22. POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
  23. VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间
  24. RETRY_TIMES = 3 # 请求询价/验价接口重试次数
  25. RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔
  26. RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间
  27. PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币
  28. RATES_RETRY_TIMES = 3 # 汇率接口重试次数
  29. RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔
  30. def time_format_conversion(match_time,
  31. in_strftime_str="%Y-%m-%dT%H:%M:%S",
  32. out_strftime_str="%Y%m%d%H%M%S"):
  33. """
  34. 时间格式转换
  35. match_time 输入的时间 023-07-29T04:15:00
  36. in_strftime_str 输入的时间格式
  37. out_strftime_str 输出的时间格式
  38. """
  39. time_array = time.strptime(match_time, in_strftime_str)
  40. return time.strftime(out_strftime_str, time_array)
  41. class FlightPriceRequestError(Exception):
  42. """请求询价/验价接口时的网络或响应异常."""
  43. def __init__(self, message: str, cause: Exception | None = None):
  44. self.cause = cause
  45. super().__init__(message)
  46. def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
  47. """
  48. 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
  49. 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
  50. 带重试,失败返回 None。
  51. """
  52. last_err: Exception | None = None
  53. for attempt in range(RATES_RETRY_TIMES):
  54. try:
  55. resp = requests.get(
  56. rates_url,
  57. params={"base": base, "symbols": symbols},
  58. headers={
  59. "cid": cid,
  60. "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
  61. },
  62. timeout=15
  63. )
  64. resp.raise_for_status()
  65. body = resp.json()
  66. if body.get("code") != 0:
  67. last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
  68. if attempt < RATES_RETRY_TIMES - 1:
  69. time.sleep(RATES_RETRY_INTERVAL)
  70. continue
  71. data = body.get("data")
  72. if isinstance(data, dict) and "rate" in data:
  73. return float(data["rate"])
  74. last_err = ValueError("响应缺少 data.rate")
  75. except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
  76. last_err = e
  77. if attempt < RATES_RETRY_TIMES - 1:
  78. time.sleep(RATES_RETRY_INTERVAL)
  79. return None
  80. class FlightPriceClient:
  81. """请求封装:询价、验价接口."""
  82. def __init__(self, base_url: str = BASE_URL, cid: str = CID):
  83. self.base_url = base_url.rstrip("/")
  84. self.cid = cid
  85. self.headers = {
  86. "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
  87. "Content-Type": "application/json",
  88. }
  89. def _request(self, url: str, payload: dict) -> dict:
  90. """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
  91. last_err = None
  92. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  93. for attempt in range(RETRY_TIMES):
  94. try:
  95. if attempt > 0:
  96. time.sleep(RETRY_INTERVAL)
  97. resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
  98. resp.raise_for_status()
  99. body = resp.json()
  100. print(json.dumps(body, ensure_ascii=False)[:200])
  101. return body
  102. # print(resp.json())
  103. # return resp.json()
  104. except requests.Timeout as e:
  105. last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
  106. except requests.ConnectionError as e:
  107. last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
  108. except requests.HTTPError as e:
  109. last_err = FlightPriceRequestError(
  110. f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
  111. )
  112. except requests.RequestException as e:
  113. last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
  114. except json.JSONDecodeError as e:
  115. last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
  116. raise last_err
  117. def search_flights(
  118. self,
  119. from_city_code: str,
  120. to_city_code: str,
  121. from_day: str,
  122. ) -> dict:
  123. """询价接口."""
  124. url = f"{self.base_url}/v1/search_flights"
  125. payload = {
  126. "from_city_code": from_city_code,
  127. "to_city_code": to_city_code,
  128. "from_day": from_day,
  129. "cid": self.cid,
  130. }
  131. return self._request(url, payload)
  132. def verify_price(
  133. self,
  134. from_city_code: str,
  135. to_city_code: str,
  136. from_day: str,
  137. data: str,
  138. not_verify: bool = False,
  139. async_: bool = True,
  140. ) -> dict:
  141. """验价接口."""
  142. url = f"{self.base_url}/v1/verify_price"
  143. payload = {
  144. "from_city_code": from_city_code,
  145. "to_city_code": to_city_code,
  146. "from_day": from_day,
  147. "data": data,
  148. "cid": self.cid,
  149. "not_verify": not_verify,
  150. "async": async_,
  151. }
  152. return self._request(url, payload)
  153. class ResultMatcher:
  154. """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
  155. @staticmethod
  156. def match(
  157. result: list[dict],
  158. cabins: str,
  159. baggages: str,
  160. flight_numbers: str,
  161. ) -> dict | None:
  162. """
  163. 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
  164. 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
  165. 返回匹配到的那条 result 项(含 data),未匹配到返回 None。
  166. """
  167. separator = '|' # 更换分隔符由;为|
  168. cabin_list = [s.strip() for s in (cabins or "").split(separator)]
  169. baggage_list = [s.strip() for s in (baggages or "").split(separator)]
  170. flight_list = [s.strip() for s in (flight_numbers or "").split(separator)] if flight_numbers else []
  171. n = len(cabin_list)
  172. if n == 0 or len(baggage_list) != n:
  173. return None
  174. if flight_list and len(flight_list) != n:
  175. return None
  176. for item in result:
  177. segments = item.get("segments") or []
  178. if len(segments) != n:
  179. continue
  180. for i in range(n):
  181. seg = segments[i]
  182. if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
  183. break
  184. if flight_list and seg.get("flight_number") != flight_list[i]:
  185. break
  186. else:
  187. return item
  188. return None
  189. class VerifyResultHandler:
  190. """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
  191. @staticmethod
  192. def parse_time(ts: str | int | None) -> datetime | None:
  193. """解析时间字符串或时间戳."""
  194. if ts is None:
  195. return None
  196. if isinstance(ts, (int, float)):
  197. return datetime.fromtimestamp(ts)
  198. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
  199. try:
  200. return datetime.strptime(str(ts).strip(), fmt)
  201. except (ValueError, TypeError):
  202. continue
  203. return None
  204. @classmethod
  205. def is_within_3_minutes(cls, ts: str | int | None) -> bool:
  206. """判断给定时间是否在距今 3 分钟内."""
  207. dt = cls.parse_time(ts)
  208. if dt is None:
  209. return False
  210. return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
  211. @classmethod
  212. def get_valid_price(cls, single_result: dict) -> dict | None:
  213. """
  214. 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
  215. 否则看 verify_time 是否在 3 分钟内。
  216. 符合条件返回该条 dict,否则返回 None。
  217. """
  218. if not single_result:
  219. return None
  220. if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
  221. return single_result
  222. if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
  223. return single_result
  224. return None
  225. class FlightPriceTaskRunner:
  226. """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
  227. def __init__(self, client: FlightPriceClient | None = None, logger: XmiLogger | None = None):
  228. self.logger = logger or XmiLogger("flight_price_task")
  229. self.client = client or FlightPriceClient()
  230. self.matcher = ResultMatcher()
  231. self.handler = VerifyResultHandler()
  232. self.rate = fetch_rate("USD", "CNY")
  233. def run(
  234. self,
  235. task: dict,
  236. do_verify: bool = True,
  237. ) -> dict:
  238. """
  239. 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
  240. flight_number 可选,用于匹配。
  241. do_verify=False 时仅执行询价+匹配并返回 matched(不做验价)
  242. 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ..., "raw_search": ..., "matched": ...}
  243. """
  244. from_city_code = task["from_city_code"]
  245. to_city_code = task["to_city_code"]
  246. from_day = task["from_day"]
  247. flight_numbers = task.get("flight_numbers")
  248. cabins = task["cabins"]
  249. baggages = task["baggages"]
  250. # 1. 询价
  251. try:
  252. search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
  253. except FlightPriceRequestError as e:
  254. self.logger.warning(f"询价请求异常: {e}")
  255. return {"status": "request_error", "msg": str(e), "phase": "search"}
  256. if search_resp.get("code") != 0:
  257. return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
  258. result_list = search_resp.get("result") or []
  259. # print(result_list)
  260. matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
  261. # print(matched)
  262. if not matched:
  263. return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
  264. data = matched.get("data")
  265. if not data:
  266. return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
  267. # 只询价不验价走的流程
  268. if not do_verify:
  269. # return {"status": "ok", "raw_search": search_resp, "matched": matched, "data": data}
  270. expected_in_currency, rate_err = self._expected_price_in_verify_currency(task, matched)
  271. if rate_err:
  272. return {
  273. "status": "rate_error",
  274. "msg": rate_err,
  275. "raw_search": search_resp,
  276. "matched": matched,
  277. "data": data,
  278. }
  279. actual = matched.get("adult_total_price") # 询价和验价接口出来的币种已经是人民币, 不用再转换
  280. if self._price_within_threshold(expected_in_currency, actual): # 对比
  281. return {
  282. "status": "ok",
  283. "price_info": self._extract_price_info(matched),
  284. "raw_search": search_resp,
  285. "matched": matched,
  286. "data": data,
  287. }
  288. return {
  289. "status": "price_not_within_threshold",
  290. "msg": "询价结果价格不在阈值内",
  291. "expected": expected_in_currency,
  292. "actual": actual,
  293. "raw_search": search_resp,
  294. "matched": matched,
  295. "data": data,
  296. }
  297. # 2. 验价(先 not_verify=False)
  298. try:
  299. verify_resp = self.client.verify_price(
  300. from_city_code=from_city_code,
  301. to_city_code=to_city_code,
  302. from_day=from_day,
  303. data=data,
  304. not_verify=False,
  305. async_=True,
  306. )
  307. except FlightPriceRequestError as e:
  308. self.logger.warning(f"验价请求异常: {e}")
  309. return {"status": "request_error", "msg": str(e), "phase": "verify"}
  310. if verify_resp.get("code") != 0:
  311. return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
  312. # print(verify_resp)
  313. verify_result = verify_resp.get("result")
  314. if isinstance(verify_result, list) and len(verify_result) >= 1:
  315. single = verify_result[0]
  316. valid = self.handler.get_valid_price(single)
  317. if valid is not None:
  318. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  319. if rate_err:
  320. return {"status": "rate_error", "msg": rate_err}
  321. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  322. return {
  323. "status": "ok",
  324. "price_info": self._extract_price_info(valid),
  325. "raw_verify": verify_resp,
  326. }
  327. # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
  328. deadline = time.monotonic() + RETRY_DURATION
  329. while time.monotonic() < deadline:
  330. time.sleep(RETRY_INTERVAL)
  331. try:
  332. verify_resp = self.client.verify_price(
  333. from_city_code=from_city_code,
  334. to_city_code=to_city_code,
  335. from_day=from_day,
  336. data=data,
  337. not_verify=True,
  338. async_=True,
  339. )
  340. except FlightPriceRequestError as e:
  341. self.logger.warning(f"验价重试请求异常: {e}")
  342. continue
  343. if verify_resp.get("code") != 0:
  344. continue
  345. verify_result = verify_resp.get("result")
  346. if isinstance(verify_result, list) and len(verify_result) >= 1:
  347. single = verify_result[0]
  348. valid = self.handler.get_valid_price(single)
  349. if valid is not None:
  350. expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
  351. if rate_err:
  352. continue
  353. if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
  354. return {
  355. "status": "ok",
  356. "price_info": self._extract_price_info(valid),
  357. "raw_verify": verify_resp,
  358. }
  359. else:
  360. # 价格不符合,直接跳出循环,不用继续校验
  361. break
  362. # 3 分钟内都没有符合规则的数据,先占位
  363. return {
  364. "status": "placeholder",
  365. "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
  366. "task": task,
  367. "data": data,
  368. "raw_verify": verify_resp,
  369. }
  370. def _expected_price_in_verify_currency(
  371. self, task: dict, valid: dict
  372. ) -> tuple[float | None, str | None]:
  373. """
  374. 将任务期望价格换算为验价结果币种后的数值。
  375. 返回 (换算后的价格, None),若币种一致则直接换算为 float;
  376. 若需汇率且获取失败返回 (None, "汇率获取失败")。
  377. """
  378. try:
  379. expected_val = float(task.get("adult_total_price"))
  380. except (TypeError, ValueError):
  381. return None, "任务 adult_total_price 无效"
  382. if self.rate is None:
  383. task_currency = (task.get("currency") or "USD").strip().upper()
  384. verify_currency = (valid.get("currency") or "CNY").strip().upper()
  385. if task_currency == verify_currency:
  386. return expected_val, None
  387. rate = fetch_rate(task_currency, verify_currency)
  388. if rate is None:
  389. return None, "汇率获取失败"
  390. self.rate = rate
  391. return expected_val * self.rate, None
  392. @staticmethod
  393. def _price_within_threshold(
  394. expected: str | int | float | None,
  395. actual: str | int | float | None,
  396. threshold: float | None = None,
  397. ) -> bool:
  398. """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
  399. if threshold is None:
  400. threshold = PRICE_DIFF_THRESHOLD
  401. try:
  402. e = float(expected) if expected is not None else None
  403. a = float(actual) if actual is not None else None
  404. except (TypeError, ValueError):
  405. return False
  406. if e is None or a is None:
  407. return False
  408. return abs(a - e) <= threshold
  409. @staticmethod
  410. def _extract_price_info(item: dict) -> dict:
  411. """从单条 result 项提取价格相关信息."""
  412. return {
  413. "adult_price": item.get("adult_price"),
  414. "adult_tax": item.get("adult_tax"),
  415. "adult_total_price": item.get("adult_total_price"),
  416. "currency": item.get("currency"),
  417. "now_time": item.get("now_time"),
  418. "verify_time": item.get("verify_time"),
  419. }
  420. def _process_one_task(row, runner):
  421. """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
  422. task = row
  423. separator = '|' # 分隔符由;更换为|
  424. thread_name = threading.current_thread().name
  425. # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
  426. from_city_code, to_city_code = task["city_pair"].split("-")
  427. from_day = task["flight_day"].replace("-", "")
  428. flight_numbers = task["flight_number_1"].strip()
  429. if task["flight_number_2"].strip() != "VJ":
  430. flight_numbers += separator + task["flight_number_2"].strip()
  431. cabins = separator.join(["Y"] * len(flight_numbers.split(separator)))
  432. if str(task['baggage']) == '0':
  433. baggage_str = "-;-;-;-"
  434. else:
  435. baggage_str = f"1-{task['baggage']}"
  436. baggages = separator.join([baggage_str] * len(flight_numbers.split(separator)))
  437. end_task = {
  438. "from_city_code": from_city_code,
  439. "to_city_code": to_city_code,
  440. "from_day": from_day,
  441. "flight_numbers": flight_numbers,
  442. "cabins": cabins,
  443. "baggages": baggages,
  444. "adult_total_price": task.get("adult_total_price"),
  445. "currency": task.get("currency", "USD"),
  446. }
  447. # print("--------------------------------")
  448. # print(end_task)
  449. # print("--------------------------------")
  450. time.sleep(1)
  451. out = runner.run(end_task, do_verify=False) # 不验价,仅询价
  452. # print(json.dumps(out, ensure_ascii=False, indent=2))
  453. if out.get("status") != "ok":
  454. # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
  455. return None
  456. # print(f"价格: {out.get('price_info').get('adult_total_price')}")
  457. raw_verify = out.get("raw_verify")
  458. if raw_verify:
  459. results = raw_verify.get("result") or []
  460. else:
  461. matched = out.get("matched") or {}
  462. results = [matched] if matched else []
  463. if not results:
  464. return None
  465. print("raw_verify pass")
  466. # task 存放了 keep_info 的全部字段
  467. drop_price_change_upper = float(task.get("drop_price_change_upper")) # 降价的最小幅度
  468. drop_price_change_lower = float(task.get("drop_price_change_lower"))
  469. max_threshold = round(drop_price_change_upper * runner.rate * 0.8) # 降价阈值要按汇率转人民币(四舍五入到整数)
  470. result = results[0]
  471. # adult_price = result.get("adult_price")
  472. # adult_tax = result.get("adult_tax")
  473. # adult_total_price = result.get("adult_total_price")
  474. segments = result.get("segments") or []
  475. if not segments:
  476. return None
  477. end_segments = []
  478. baggage = segments[0].get("baggage")
  479. if baggage == "-;-;-;-":
  480. pc, kg = 0, 0 # 无行李的设置?
  481. else:
  482. pc, kg = [int(i) for i in baggage.split("-")]
  483. for seg in segments:
  484. flight_number = seg.get("flight_number")
  485. operating_flight_number = seg.get("operating_flight_number")
  486. if flight_number == operating_flight_number:
  487. operating_flight_number = ""
  488. dep_time = seg.get("dep_time")
  489. arr_time = seg.get("arr_time")
  490. dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  491. arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
  492. end_segment = {
  493. "carrier": seg.get("carrier"),
  494. "flight_number": flight_number,
  495. # "dep_air_port": seg.get("dep_air_port"),
  496. # "arr_air_port": seg.get("arr_air_port"),
  497. "dep_city_code": seg.get("dep_city_code"),
  498. "arr_city_code": seg.get("arr_city_code"),
  499. # "operating_flight_number": operating_flight_number,
  500. "cabin": seg.get("cabin"),
  501. "dep_time": dep_time,
  502. # "arr_time": arr_time,
  503. }
  504. end_segments.append(end_segment)
  505. return {
  506. "trip_type": 1,
  507. # "cover_price": adult_price,
  508. # "cover_tax": adult_tax,
  509. "bag_amount": pc,
  510. "bag_weight": kg,
  511. "max_threshold": max_threshold,
  512. "segments": end_segments,
  513. "ret_segments": [],
  514. "task": task
  515. }
  516. def sync_policy(payload):
  517. headers = {
  518. "Content-Type": "application/json",
  519. }
  520. # print(json.dumps(payload, ensure_ascii=False, indent=2))
  521. response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
  522. # print(response.text[:1000])
  523. resp_json = response.json()
  524. """
  525. {
  526. "code": 0,
  527. "msg": "ok",
  528. "data": {
  529. "deleted": 1,
  530. "created": 7
  531. }
  532. }
  533. """
  534. # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
  535. return resp_json
  536. def main():
  537. logger = XmiLogger("task")
  538. # 注意 \ufeff 是 UTF-8 的 BOM
  539. # 所以需要使用 utf-8-sig 编码
  540. task_list = []
  541. runner = FlightPriceTaskRunner(logger=logger)
  542. # 1 读取任务列表
  543. output_dir = "./keep_0"
  544. keep_info_path = os.path.join(output_dir, "keep_info.csv")
  545. with open(keep_info_path, "r", encoding="utf-8-sig") as f:
  546. reader = csv.DictReader(f)
  547. for row in reader:
  548. task_list.append(row)
  549. # 2 任务列表逻辑处理(多线程)
  550. policy_list = []
  551. keep_info_end = []
  552. max_workers = 5 # 并发线程数,可按需要调整
  553. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  554. futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
  555. total = len(futures)
  556. done = 0
  557. failed = 0
  558. for future in as_completed(futures):
  559. try:
  560. flight_data = future.result()
  561. if flight_data is not None:
  562. task = flight_data.pop("task")
  563. keep_info_end.append(task)
  564. policy_list.append(flight_data)
  565. except Exception as e:
  566. failed += 1
  567. task = futures[future]
  568. # print(f"任务异常 {task}: {e}")
  569. logger.error(f"任务异常 {task}: {e}")
  570. finally:
  571. done += 1
  572. logger.info(
  573. f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
  574. )
  575. # 3 批量一次性上传政策
  576. logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
  577. # logger.info(f"policy_list: {policy_list}")
  578. logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
  579. if len(policy_list) > 0:
  580. # 这里批量一次性上传政策
  581. payload = {"items": policy_list}
  582. try:
  583. sync_policy(payload)
  584. logger.info(f"上传政策成功")
  585. except Exception as e:
  586. logger.error(f"上传政策失败: {e}")
  587. logger.error(f"{traceback.format_exc()}")
  588. logger.info(f"keep_info_end: {len(keep_info_end)}")
  589. # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
  590. output_dir = "/home/node04/descending_cabin_files_vj"
  591. os.makedirs(output_dir, exist_ok=True)
  592. if keep_info_end:
  593. out_path = os.path.join(
  594. output_dir,
  595. f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
  596. )
  597. with open(out_path, "w", encoding="utf-8-sig") as f:
  598. writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
  599. writer.writeheader()
  600. for task in keep_info_end:
  601. writer.writerow(task)
  602. logger.info("keep_info_end 写入完成")
  603. else:
  604. logger.warning("keep_info_end 为空,跳过写入CSV")
  605. if __name__ == "__main__":
  606. main()