""" 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。 """ import os import json import time import requests import threading from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from xmi_logger import XmiLogger import csv # 询价\验价 BASE_URL = "http://8.218.51.130:9000" CID = "cba37f642eab11ef8e7200163e01a06e" # 汇率 RATES_URL = "http://8.218.51.130:9003/api/v1/rates" RATES_CID = "750B5141EDBF7FA6F73A99C768130099" # 政策 # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync" POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync" VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间 RETRY_TIMES = 3 # 请求询价/验价接口重试次数 RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔 RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间 PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币 RATES_RETRY_TIMES = 3 # 汇率接口重试次数 RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔 def time_format_conversion(match_time, in_strftime_str="%Y-%m-%dT%H:%M:%S", out_strftime_str="%Y%m%d%H%M%S"): """ 时间格式转换 match_time 输入的时间 023-07-29T04:15:00 in_strftime_str 输入的时间格式 out_strftime_str 输出的时间格式 """ time_array = time.strptime(match_time, in_strftime_str) return time.strftime(out_strftime_str, time_array) class FlightPriceRequestError(Exception): """请求询价/验价接口时的网络或响应异常.""" def __init__(self, message: str, cause: Exception | None = None): self.cause = cause super().__init__(message) def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None: """ 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。 带重试,失败返回 None。 """ last_err: Exception | None = None for attempt in range(RATES_RETRY_TIMES): try: resp = requests.get( rates_url, params={"base": base, "symbols": symbols}, headers={ "cid": cid, "User-Agent": "Apifox/1.0.0 (https://apifox.com)", }, timeout=15 ) resp.raise_for_status() body = resp.json() if body.get("code") != 0: last_err = ValueError(body.get("msg", "汇率接口返回非成功")) if attempt < RATES_RETRY_TIMES - 1: time.sleep(RATES_RETRY_INTERVAL) continue data = body.get("data") if isinstance(data, dict) and "rate" in data: return float(data["rate"]) last_err = ValueError("响应缺少 data.rate") except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e: last_err = e if attempt < RATES_RETRY_TIMES - 1: time.sleep(RATES_RETRY_INTERVAL) return None class FlightPriceClient: """请求封装:询价、验价接口.""" def __init__(self, base_url: str = BASE_URL, cid: str = CID): self.base_url = base_url.rstrip("/") self.cid = cid self.headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Content-Type": "application/json", } def _request(self, url: str, payload: dict) -> dict: """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒.""" last_err = None # print(json.dumps(payload, ensure_ascii=False, indent=2)) for attempt in range(RETRY_TIMES): try: if attempt > 0: time.sleep(RETRY_INTERVAL) resp = requests.post(url, headers=self.headers, json=payload, timeout=30) resp.raise_for_status() body = resp.json() print(json.dumps(body, ensure_ascii=False)[:200]) return body # print(resp.json()) # return resp.json() except requests.Timeout as e: last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e) except requests.ConnectionError as e: last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e) except requests.HTTPError as e: last_err = FlightPriceRequestError( f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e ) except requests.RequestException as e: last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e) except json.JSONDecodeError as e: last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e) raise last_err def search_flights( self, from_city_code: str, to_city_code: str, from_day: str, ) -> dict: """询价接口.""" url = f"{self.base_url}/v1/search_flights" payload = { "from_city_code": from_city_code, "to_city_code": to_city_code, "from_day": from_day, "cid": self.cid, } return self._request(url, payload) def verify_price( self, from_city_code: str, to_city_code: str, from_day: str, data: str, not_verify: bool = False, async_: bool = True, ) -> dict: """验价接口.""" url = f"{self.base_url}/v1/verify_price" payload = { "from_city_code": from_city_code, "to_city_code": to_city_code, "from_day": from_day, "data": data, "cid": self.cid, "not_verify": not_verify, "async": async_, } return self._request(url, payload) class ResultMatcher: """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data.""" @staticmethod def match( result: list[dict], cabins: str, baggages: str, flight_numbers: str, ) -> dict | None: """ 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串, 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。 返回匹配到的那条 result 项(含 data),未匹配到返回 None。 """ separator = '|' # 更换分隔符由;为| cabin_list = [s.strip() for s in (cabins or "").split(separator)] baggage_list = [s.strip() for s in (baggages or "").split(separator)] flight_list = [s.strip() for s in (flight_numbers or "").split(separator)] if flight_numbers else [] n = len(cabin_list) if n == 0 or len(baggage_list) != n: return None if flight_list and len(flight_list) != n: return None for item in result: segments = item.get("segments") or [] if len(segments) != n: continue for i in range(n): seg = segments[i] if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]: break if flight_list and seg.get("flight_number") != flight_list[i]: break else: return item return None class VerifyResultHandler: """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格.""" @staticmethod def parse_time(ts: str | int | None) -> datetime | None: """解析时间字符串或时间戳.""" if ts is None: return None if isinstance(ts, (int, float)): return datetime.fromtimestamp(ts) for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(str(ts).strip(), fmt) except (ValueError, TypeError): continue return None @classmethod def is_within_3_minutes(cls, ts: str | int | None) -> bool: """判断给定时间是否在距今 3 分钟内.""" dt = cls.parse_time(ts) if dt is None: return False return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS @classmethod def get_valid_price(cls, single_result: dict) -> dict | None: """ 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格); 否则看 verify_time 是否在 3 分钟内。 符合条件返回该条 dict,否则返回 None。 """ if not single_result: return None if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]): return single_result if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]): return single_result return None class FlightPriceTaskRunner: """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试.""" def __init__(self, client: FlightPriceClient | None = None, logger: XmiLogger | None = None): self.logger = logger or XmiLogger("flight_price_task") self.client = client or FlightPriceClient() self.matcher = ResultMatcher() self.handler = VerifyResultHandler() self.rate = fetch_rate("USD", "CNY") def run( self, task: dict, do_verify: bool = True, ) -> dict: """ 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。 flight_number 可选,用于匹配。 do_verify=False 时仅执行询价+匹配并返回 matched(不做验价) 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ..., "raw_search": ..., "matched": ...} """ from_city_code = task["from_city_code"] to_city_code = task["to_city_code"] from_day = task["from_day"] flight_numbers = task.get("flight_numbers") cabins = task["cabins"] baggages = task["baggages"] # 1. 询价 try: search_resp = self.client.search_flights(from_city_code, to_city_code, from_day) except FlightPriceRequestError as e: self.logger.warning(f"询价请求异常: {e}") return {"status": "request_error", "msg": str(e), "phase": "search"} if search_resp.get("code") != 0: return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp} result_list = search_resp.get("result") or [] # print(result_list) matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers) # print(matched) if not matched: return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp} data = matched.get("data") if not data: return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp} # 只询价不验价走的流程 if not do_verify: # return {"status": "ok", "raw_search": search_resp, "matched": matched, "data": data} expected_in_currency, rate_err = self._expected_price_in_verify_currency(task, matched) if rate_err: return { "status": "rate_error", "msg": rate_err, "raw_search": search_resp, "matched": matched, "data": data, } actual = matched.get("adult_total_price") # 询价和验价接口出来的币种已经是人民币, 不用再转换 if self._price_within_threshold(expected_in_currency, actual): # 对比 return { "status": "ok", "price_info": self._extract_price_info(matched), "raw_search": search_resp, "matched": matched, "data": data, } return { "status": "price_not_within_threshold", "msg": "询价结果价格不在阈值内", "expected": expected_in_currency, "actual": actual, "raw_search": search_resp, "matched": matched, "data": data, } # 2. 验价(先 not_verify=False) try: verify_resp = self.client.verify_price( from_city_code=from_city_code, to_city_code=to_city_code, from_day=from_day, data=data, not_verify=False, async_=True, ) except FlightPriceRequestError as e: self.logger.warning(f"验价请求异常: {e}") return {"status": "request_error", "msg": str(e), "phase": "verify"} if verify_resp.get("code") != 0: return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp} # print(verify_resp) verify_result = verify_resp.get("result") if isinstance(verify_result, list) and len(verify_result) >= 1: single = verify_result[0] valid = self.handler.get_valid_price(single) if valid is not None: expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid) if rate_err: return {"status": "rate_error", "msg": rate_err} if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")): return { "status": "ok", "price_info": self._extract_price_info(valid), "raw_verify": verify_resp, } # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟 deadline = time.monotonic() + RETRY_DURATION while time.monotonic() < deadline: time.sleep(RETRY_INTERVAL) try: verify_resp = self.client.verify_price( from_city_code=from_city_code, to_city_code=to_city_code, from_day=from_day, data=data, not_verify=True, async_=True, ) except FlightPriceRequestError as e: self.logger.warning(f"验价重试请求异常: {e}") continue if verify_resp.get("code") != 0: continue verify_result = verify_resp.get("result") if isinstance(verify_result, list) and len(verify_result) >= 1: single = verify_result[0] valid = self.handler.get_valid_price(single) if valid is not None: expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid) if rate_err: continue if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")): return { "status": "ok", "price_info": self._extract_price_info(valid), "raw_verify": verify_resp, } else: # 价格不符合,直接跳出循环,不用继续校验 break # 3 分钟内都没有符合规则的数据,先占位 return { "status": "placeholder", "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理", "task": task, "data": data, "raw_verify": verify_resp, } def _expected_price_in_verify_currency( self, task: dict, valid: dict ) -> tuple[float | None, str | None]: """ 将任务期望价格换算为验价结果币种后的数值。 返回 (换算后的价格, None),若币种一致则直接换算为 float; 若需汇率且获取失败返回 (None, "汇率获取失败")。 """ try: expected_val = float(task.get("adult_total_price")) except (TypeError, ValueError): return None, "任务 adult_total_price 无效" if self.rate is None: task_currency = (task.get("currency") or "USD").strip().upper() verify_currency = (valid.get("currency") or "CNY").strip().upper() if task_currency == verify_currency: return expected_val, None rate = fetch_rate(task_currency, verify_currency) if rate is None: return None, "汇率获取失败" self.rate = rate return expected_val * self.rate, None @staticmethod def _price_within_threshold( expected: str | int | float | None, actual: str | int | float | None, threshold: float | None = None, ) -> bool: """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。""" if threshold is None: threshold = PRICE_DIFF_THRESHOLD try: e = float(expected) if expected is not None else None a = float(actual) if actual is not None else None except (TypeError, ValueError): return False if e is None or a is None: return False return abs(a - e) <= threshold @staticmethod def _extract_price_info(item: dict) -> dict: """从单条 result 项提取价格相关信息.""" return { "adult_price": item.get("adult_price"), "adult_tax": item.get("adult_tax"), "adult_total_price": item.get("adult_total_price"), "currency": item.get("currency"), "now_time": item.get("now_time"), "verify_time": item.get("verify_time"), } def _process_one_task(row, runner): """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。""" task = row separator = '|' # 分隔符由;更换为| thread_name = threading.current_thread().name # print(f"[thread_name: {thread_name}] 正在处理任务: {task}") from_city_code, to_city_code = task["city_pair"].split("-") from_day = task["flight_day"].replace("-", "") flight_numbers = task["flight_number_1"].strip() if task["flight_number_2"].strip() != "VJ": flight_numbers += separator + task["flight_number_2"].strip() cabins = separator.join(["Y"] * len(flight_numbers.split(separator))) if str(task['baggage']) == '0': baggage_str = "-;-;-;-" else: baggage_str = f"1-{task['baggage']}" baggages = separator.join([baggage_str] * len(flight_numbers.split(separator))) end_task = { "from_city_code": from_city_code, "to_city_code": to_city_code, "from_day": from_day, "flight_numbers": flight_numbers, "cabins": cabins, "baggages": baggages, "adult_total_price": task.get("adult_total_price"), "currency": task.get("currency", "USD"), } # print("--------------------------------") # print(end_task) # print("--------------------------------") time.sleep(1) out = runner.run(end_task, do_verify=False) # 不验价,仅询价 # print(json.dumps(out, ensure_ascii=False, indent=2)) if out.get("status") != "ok": # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}") return None # print(f"价格: {out.get('price_info').get('adult_total_price')}") raw_verify = out.get("raw_verify") if raw_verify: results = raw_verify.get("result") or [] else: matched = out.get("matched") or {} results = [matched] if matched else [] if not results: return None print("raw_verify pass") # task 存放了 keep_info 的全部字段 drop_price_change_upper = float(task.get("drop_price_change_upper")) # 降价的最小幅度 drop_price_change_lower = float(task.get("drop_price_change_lower")) max_threshold = round(drop_price_change_upper * runner.rate * 0.5) # 降价阈值要按汇率转人民币(四舍五入到整数) result = results[0] # adult_price = result.get("adult_price") # adult_tax = result.get("adult_tax") # adult_total_price = result.get("adult_total_price") segments = result.get("segments") or [] if not segments: return None end_segments = [] baggage = segments[0].get("baggage") if baggage == "-;-;-;-": pc, kg = 0, 0 # 无行李的设置? else: pc, kg = [int(i) for i in baggage.split("-")] for seg in segments: flight_number = seg.get("flight_number") operating_flight_number = seg.get("operating_flight_number") if flight_number == operating_flight_number: operating_flight_number = "" dep_time = seg.get("dep_time") arr_time = seg.get("arr_time") dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S") arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S") end_segment = { "carrier": seg.get("carrier"), "flight_number": flight_number, # "dep_air_port": seg.get("dep_air_port"), # "arr_air_port": seg.get("arr_air_port"), "dep_city_code": seg.get("dep_city_code"), "arr_city_code": seg.get("arr_city_code"), # "operating_flight_number": operating_flight_number, "cabin": seg.get("cabin"), "dep_time": dep_time, # "arr_time": arr_time, } end_segments.append(end_segment) return { "trip_type": 1, # "cover_price": adult_price, # "cover_tax": adult_tax, "bag_amount": pc, "bag_weight": kg, "max_threshold": max_threshold, "segments": end_segments, "ret_segments": [], "task": task } def sync_policy(payload): headers = { "Content-Type": "application/json", } # print(json.dumps(payload, ensure_ascii=False, indent=2)) response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30) resp_json = response.json() """ { "code": 0, "msg": "ok", "data": { "deleted": 1, "created": 7 } } """ # print(json.dumps(resp_json, ensure_ascii=False, indent=2)) return resp_json def main(): logger = XmiLogger("task") # 注意 \ufeff 是 UTF-8 的 BOM # 所以需要使用 utf-8-sig 编码 task_list = [] runner = FlightPriceTaskRunner(logger=logger) # 1 读取任务列表 output_dir = "./keep_0" keep_info_path = os.path.join(output_dir, "keep_info.csv") with open(keep_info_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: task_list.append(row) # 2 任务列表逻辑处理(多线程) policy_list = [] keep_info_end = [] max_workers = 5 # 并发线程数,可按需要调整 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(_process_one_task, task, runner): task for task in task_list} total = len(futures) done = 0 failed = 0 for future in as_completed(futures): try: flight_data = future.result() if flight_data is not None: task = flight_data.pop("task") keep_info_end.append(task) policy_list.append(flight_data) except Exception as e: failed += 1 task = futures[future] # print(f"任务异常 {task}: {e}") logger.error(f"任务异常 {task}: {e}") finally: done += 1 logger.info( f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}" ) # 3 批量一次性上传政策 logger.info(f"数据过滤后, 上传政策: {len(policy_list)}") # logger.info(f"policy_list: {policy_list}") logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}") if len(policy_list) > 0: # 这里批量一次性上传政策 payload = {"items": policy_list} sync_policy(payload) logger.info(f"keep_info_end: {len(keep_info_end)}") # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留) output_dir = "/home/node04/descending_cabin_files" os.makedirs(output_dir, exist_ok=True) if keep_info_end: out_path = os.path.join( output_dir, f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", ) with open(out_path, "w", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys()) writer.writeheader() for task in keep_info_end: writer.writerow(task) logger.info("keep_info_end 写入完成") else: logger.warning("keep_info_end 为空,跳过写入CSV") if __name__ == "__main__": main()