Procházet zdrojové kódy

接入智博的验价任务, 修复自己出keep_info表时的发现的bug

node04 před 3 týdny
rodič
revize
abcbf5b3be
2 změnil soubory, kde provedl 684 přidání a 31 odebrání
  1. 591 0
      descending_cabin_task.py
  2. 93 31
      follow_up.py

+ 591 - 0
descending_cabin_task.py

@@ -0,0 +1,591 @@
+"""
+询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
+"""
+import os
+import json
+import time
+import requests
+import threading
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from xmi_logger import XmiLogger
+import csv
+
+
+# 询价\验价
+BASE_URL = "http://8.218.51.130:9000"
+CID = "cba37f642eab11ef8e7200163e01a06e"
+# 汇率
+RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
+RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
+# 政策
+# POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
+POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
+
+VALID_DURATION_SECONDS = 3 * 60  # 3 分钟 验价结果有效时间 
+RETRY_TIMES = 3  # 请求询价/验价接口重试次数
+RETRY_INTERVAL = 10  # 秒 询价/验价接口重试间隔
+RETRY_DURATION = 3 * 60  # 持续 3 分钟 询价/验价接口重试时间
+PRICE_DIFF_THRESHOLD = 10  # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过  人民币
+RATES_RETRY_TIMES = 3  # 汇率接口重试次数
+RATES_RETRY_INTERVAL = 2  # 汇率接口重试间隔
+
+
+
+def time_format_conversion(match_time,
+                           in_strftime_str="%Y-%m-%dT%H:%M:%S",
+                           out_strftime_str="%Y%m%d%H%M%S"):
+    """
+    时间格式转换
+    match_time 输入的时间 023-07-29T04:15:00
+    in_strftime_str 输入的时间格式
+    out_strftime_str 输出的时间格式
+    """
+    time_array = time.strptime(match_time, in_strftime_str)
+    return time.strftime(out_strftime_str, time_array)
+
+
+
+
+class FlightPriceRequestError(Exception):
+    """请求询价/验价接口时的网络或响应异常."""
+    def __init__(self, message: str, cause: Exception | None = None):
+        self.cause = cause
+        super().__init__(message)
+
+
+
+def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
+    """
+    请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
+    响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
+    带重试,失败返回 None。
+    """
+    last_err: Exception | None = None
+    for attempt in range(RATES_RETRY_TIMES):
+        try:
+            resp = requests.get(
+                rates_url,
+                params={"base": base, "symbols": symbols},
+                headers={
+                    "cid": cid,
+                    "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
+                },
+                timeout=15 
+            )
+            resp.raise_for_status()
+            body = resp.json()
+            if body.get("code") != 0:
+                last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
+                if attempt < RATES_RETRY_TIMES - 1:
+                    time.sleep(RATES_RETRY_INTERVAL)
+                continue
+            data = body.get("data")
+            if isinstance(data, dict) and "rate" in data:
+                return float(data["rate"])
+            last_err = ValueError("响应缺少 data.rate")
+        except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
+            last_err = e
+        if attempt < RATES_RETRY_TIMES - 1:
+            time.sleep(RATES_RETRY_INTERVAL)
+    return None
+
+
+class FlightPriceClient:
+    """请求封装:询价、验价接口."""
+
+    def __init__(self, base_url: str = BASE_URL, cid: str = CID):
+        
+        self.base_url = base_url.rstrip("/")
+        self.cid = cid
+        self.headers = {
+            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
+            "Content-Type": "application/json",
+        }
+
+    def _request(self, url: str, payload: dict) -> dict:
+        """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
+        last_err = None
+        # print(json.dumps(payload, ensure_ascii=False, indent=2))
+        for attempt in range(RETRY_TIMES):
+            try:
+                if attempt > 0:
+                    time.sleep(RETRY_INTERVAL)
+                resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
+                resp.raise_for_status()
+                print(resp.json())
+                return resp.json()
+            except requests.Timeout as e:
+                last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
+            except requests.ConnectionError as e:
+                last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
+            except requests.HTTPError as e:
+                last_err = FlightPriceRequestError(
+                    f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
+                )
+            except requests.RequestException as e:
+                last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
+            except json.JSONDecodeError as e:
+                last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
+        raise last_err
+
+    def search_flights(
+        self,
+        from_city_code: str,
+        to_city_code: str,
+        from_day: str,
+    ) -> dict:
+        """询价接口."""
+        url = f"{self.base_url}/v1/search_flights"
+        payload = {
+            "from_city_code": from_city_code,
+            "to_city_code": to_city_code,
+            "from_day": from_day,
+            "cid": self.cid,
+        }
+        return self._request(url, payload)
+
+    def verify_price(
+        self,
+        from_city_code: str,
+        to_city_code: str,
+        from_day: str,
+        data: str,
+        not_verify: bool = False,
+        async_: bool = True,
+    ) -> dict:
+        """验价接口."""
+        url = f"{self.base_url}/v1/verify_price"
+        payload = {
+            "from_city_code": from_city_code,
+            "to_city_code": to_city_code,
+            "from_day": from_day,
+            "data": data,
+            "cid": self.cid,
+            "not_verify": not_verify,   
+            "async": async_,
+        }
+        return self._request(url, payload)
+
+
+class ResultMatcher:
+    """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
+
+    @staticmethod
+    def match(
+        result: list[dict],
+        cabins: str,
+        baggages: str,
+        flight_numbers: str,
+    ) -> dict | None:
+        """
+        在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
+        第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
+        返回匹配到的那条 result 项(含 data),未匹配到返回 None。
+        """
+        cabin_list = [s.strip() for s in (cabins or "").split(";")]
+        baggage_list = [s.strip() for s in (baggages or "").split(";")]
+        flight_list = [s.strip() for s in (flight_numbers or "").split(";")] if flight_numbers else []
+
+        n = len(cabin_list)
+        if n == 0 or len(baggage_list) != n:
+            return None
+        if flight_list and len(flight_list) != n:
+            return None
+
+        for item in result:
+            segments = item.get("segments") or []
+            if len(segments) != n:
+                continue
+            for i in range(n):
+                seg = segments[i]
+                if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
+                    break
+                if flight_list and seg.get("flight_number") != flight_list[i]:
+                    break
+            else:
+                return item
+        return None
+
+
+class VerifyResultHandler:
+    """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
+
+    @staticmethod
+    def parse_time(ts: str | int | None) -> datetime | None:
+        """解析时间字符串或时间戳."""
+        if ts is None:
+            return None
+        if isinstance(ts, (int, float)):
+            return datetime.fromtimestamp(ts)
+        for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
+            try:
+                return datetime.strptime(str(ts).strip(), fmt)
+            except (ValueError, TypeError):
+                continue
+        return None
+
+    @classmethod
+    def is_within_3_minutes(cls, ts: str | int | None) -> bool:
+        """判断给定时间是否在距今 3 分钟内."""
+        dt = cls.parse_time(ts)
+        if dt is None:
+            return False
+        return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
+
+    @classmethod
+    def get_valid_price(cls, single_result: dict) -> dict | None:
+        """
+        单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
+        否则看 verify_time 是否在 3 分钟内。
+        符合条件返回该条 dict,否则返回 None。
+        """
+        if not single_result:
+            return None
+        if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
+            return single_result
+        if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
+            return single_result
+        return None
+
+
+class FlightPriceTaskRunner:
+    """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
+
+    def __init__(self, client: FlightPriceClient | None = None):
+        self.logger = XmiLogger("flight_price_task")
+        self.client = client or FlightPriceClient()
+        self.matcher = ResultMatcher()
+        self.handler = VerifyResultHandler()
+        # self.rate = fetch_rate("USD", "CNY")
+        
+
+    def run(
+        self,
+        task: dict,
+    ) -> dict:
+        """
+        执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
+        flight_number 可选,用于匹配。
+        返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ...}
+        """
+        from_city_code = task["from_city_code"]
+        to_city_code = task["to_city_code"]
+        from_day = task["from_day"]
+        flight_numbers = task.get("flight_numbers")
+        cabins = task["cabins"]
+        baggages = task["baggages"]
+
+        # 1. 询价
+        try:
+            search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
+        except FlightPriceRequestError as e:
+            self.logger.warning(f"询价请求异常: {e}")
+            return {"status": "request_error", "msg": str(e), "phase": "search"}
+
+        if search_resp.get("code") != 0:
+            return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
+
+        result_list = search_resp.get("result") or []
+        # print(result_list)
+        matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
+        # print(matched)
+        if not matched:
+            return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
+
+        data = matched.get("data")
+        if not data:
+            return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
+        # 2. 验价(先 not_verify=False)
+        try:
+            verify_resp = self.client.verify_price(
+                from_city_code=from_city_code,
+                to_city_code=to_city_code,
+                from_day=from_day,
+                data=data,
+                not_verify=False,
+                async_=True,
+            )
+        except FlightPriceRequestError as e:
+            self.logger.warning(f"验价请求异常: {e}")
+            return {"status": "request_error", "msg": str(e), "phase": "verify"}
+
+        if verify_resp.get("code") != 0:
+            return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
+        # print(verify_resp)
+        verify_result = verify_resp.get("result")
+        if isinstance(verify_result, list) and len(verify_result) >= 1:
+            single = verify_result[0]
+            valid = self.handler.get_valid_price(single)
+            if valid is not None:
+                expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
+                if rate_err:
+                    return {"status": "rate_error", "msg": rate_err}
+                if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
+                    return {
+                        "status": "ok",
+                        "price_info": self._extract_price_info(valid),
+                        "raw_verify": verify_resp,
+                    }
+
+        # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
+        deadline = time.monotonic() + RETRY_DURATION
+        while time.monotonic() < deadline:
+            time.sleep(RETRY_INTERVAL)
+            try:
+                verify_resp = self.client.verify_price(
+                    from_city_code=from_city_code,
+                    to_city_code=to_city_code,
+                    from_day=from_day,
+                    data=data,
+                    not_verify=True,
+                    async_=True,
+                )
+            except FlightPriceRequestError as e:
+                self.logger.warning(f"验价重试请求异常: {e}")
+                continue
+            if verify_resp.get("code") != 0:
+                continue
+            verify_result = verify_resp.get("result")
+            if isinstance(verify_result, list) and len(verify_result) >= 1:
+                single = verify_result[0]
+                valid = self.handler.get_valid_price(single)
+                if valid is not None:
+                    expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
+                    if rate_err:
+                        continue
+                    if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
+                        return {
+                            "status": "ok",
+                            "price_info": self._extract_price_info(valid),
+                            "raw_verify": verify_resp,
+                        }
+                    else:
+                        # 价格不符合,直接跳出循环,不用继续校验
+                        break
+
+        # 3 分钟内都没有符合规则的数据,先占位
+        return {
+            "status": "placeholder",
+            "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
+            "task": task,
+            "data": data,
+            "raw_verify": verify_resp,
+        }
+
+    def _expected_price_in_verify_currency(
+        self, task: dict, valid: dict
+    ) -> tuple[float | None, str | None]:
+        """
+        将任务期望价格换算为验价结果币种后的数值。
+        返回 (换算后的价格, None),若币种一致则直接换算为 float;
+        若需汇率且获取失败返回 (None, "汇率获取失败")。
+        """
+        try:
+            expected_val = float(task.get("adult_total_price"))
+        except (TypeError, ValueError):
+            return None, "任务 adult_total_price 无效"
+        task_currency = (task.get("currency") or "USD").strip().upper()
+        verify_currency = (valid.get("currency") or "CNY").strip().upper()
+        if task_currency == verify_currency:
+            return expected_val, None
+        rate = fetch_rate(task_currency, verify_currency)
+        if rate is None:
+            return None, "汇率获取失败"
+        return expected_val * rate, None
+
+    @staticmethod
+    def _price_within_threshold(
+        expected: str | int | float | None,
+        actual: str | int | float | None,
+        threshold: float | None = None,
+    ) -> bool:
+        """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
+        if threshold is None:
+            threshold = PRICE_DIFF_THRESHOLD
+        try:
+            e = float(expected) if expected is not None else None
+            a = float(actual) if actual is not None else None
+        except (TypeError, ValueError):
+            return False
+        if e is None or a is None:
+            return False
+        return abs(a - e) <= threshold
+
+    @staticmethod
+    def _extract_price_info(item: dict) -> dict:
+        """从单条 result 项提取价格相关信息."""
+        return {
+            "adult_price": item.get("adult_price"),
+            "adult_tax": item.get("adult_tax"),
+            "adult_total_price": item.get("adult_total_price"),
+            "currency": item.get("currency"),
+            "now_time": item.get("now_time"),
+            "verify_time": item.get("verify_time"),
+        }
+
+
+def _process_one_task(row, runner):
+    """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
+    task = row
+
+
+    thread_name = threading.current_thread().name
+    # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
+
+    from_city_code, to_city_code = task["city_pair"].split("-")
+    from_day = task["flight_day"].replace("-", "")
+
+    flight_numbers = task["flight_number_1"].strip()
+    if task["flight_number_2"].strip() != "VJ":
+        flight_numbers += ";" + task["flight_number_2"].strip()
+    cabins = ";".join(["Y"] * len(flight_numbers.split(";")))
+    baggages = ";".join([f"1-{task['baggage']}"] * len(flight_numbers.split(";")))
+
+    end_task = {
+        "from_city_code": from_city_code,
+        "to_city_code": to_city_code,
+        "from_day": from_day,
+        "flight_numbers": flight_numbers,
+        "cabins": cabins,
+        "baggages": baggages,
+        "adult_total_price": task.get("adult_total_price"),
+        "currency": task.get("currency", "USD"),
+    }
+    # print("--------------------------------")
+    # print(end_task)
+    # print("--------------------------------")
+
+    out = runner.run(end_task)
+    # print(json.dumps(out, ensure_ascii=False, indent=2))
+    if out.get("status") != "ok":
+        # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
+        return None
+
+    # print(f"价格: {out.get('price_info').get('adult_total_price')}")
+    raw_verify = out.get("raw_verify")
+    results = raw_verify.get("result")
+    if not results:
+        return None
+
+    result = results[0]
+    segments = result.get("segments")
+    end_segments = []
+    baggage = segments[0].get("baggage")
+    pc, kg = [int(i) for i in baggage.split("-")]
+    for seg in segments:
+        flight_number = seg.get("flight_number")
+        operating_flight_number = seg.get("operating_flight_number")
+        if flight_number == operating_flight_number:
+            operating_flight_number = ""
+
+        dep_time = seg.get("dep_time")
+        arr_time = seg.get("arr_time")
+        dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
+        arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
+
+        end_segment = {
+            "carrier": seg.get("carrier"),
+            "dep_air_port": seg.get("dep_air_port"),
+            "arr_air_port": seg.get("arr_air_port"),
+            "dep_city_code": seg.get("dep_city_code"),
+            "arr_city_code": seg.get("arr_city_code"),
+            "flight_number": flight_number,
+            "operating_flight_number": operating_flight_number,
+            "cabin": seg.get("cabin"),
+            "dep_time": dep_time,
+            "arr_time": arr_time,
+        }
+        end_segments.append(end_segment)
+
+    return {
+        "trip_type": 1,
+        "segments": end_segments,
+        "price_add": 0,
+        "bag_amount": pc,
+        "bag_weight": kg,
+        "task": task
+    }
+
+
+
+def sync_policy(payload):
+    headers = {
+        "Content-Type": "application/json",
+    }
+    # print(json.dumps(payload, ensure_ascii=False, indent=2))
+    response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
+    resp_json = response.json()
+    """
+    {
+        "code": 0,
+        "msg": "ok",
+        "data": {
+            "deleted": 1,
+            "created": 7
+        }
+    }
+    """
+    # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
+    return resp_json
+
+
+
+def main():
+
+    logger = XmiLogger("task")
+
+    # 注意  \ufeff 是 UTF-8 的 BOM
+    # 所以需要使用 utf-8-sig 编码
+    task_list = []
+    runner = FlightPriceTaskRunner()
+
+    # 1 读取任务列表
+    output_dir = "./keep_0"
+    keep_info_path = os.path.join(output_dir, "keep_info.csv")
+
+    with open(keep_info_path, "r", encoding="utf-8-sig") as f:
+        reader = csv.DictReader(f)
+        for row in reader:
+            task_list.append(row)
+
+    # 2 任务列表逻辑处理(多线程)
+
+    policy_list = []
+    keep_info_end = []
+    max_workers = 3  # 并发线程数,可按需要调整
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
+        for future in as_completed(futures):
+            try:
+                flight_data = future.result()
+                if flight_data is not None:
+                    task = flight_data.pop("task")
+                    keep_info_end.append(task)
+                    policy_list.append(flight_data)
+            except Exception as e:
+                task = futures[future]
+                print(f"任务异常 {task}: {e}")
+
+    # 3 批量一次性上传政策
+    logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
+    logger.info(f"policy_list: {policy_list}")
+    if len(policy_list) > 0:
+        # 这里批量一次性上传政策 
+        payload = {"items": policy_list}
+        sync_policy(payload)
+
+    logger.info(f"keep_info_end: {len(keep_info_end)}")
+    # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理  提供下载页面 (历史数据需要保留)
+    # if not os.path.exists("/home/node04/descending_cabin_files"):
+    #     os.makedirs("/home/node04/descending_cabin_files")
+    with open(f"/home/node04/descending_cabin_files/keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", "w", encoding="utf-8-sig") as f:
+        writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
+        writer.writeheader()
+        for task in keep_info_end:
+            writer.writerow(task)
+    logger.info(f"keep_info_end 写入完成")
+
+
+if __name__ == "__main__":
+    main()

+ 93 - 31
follow_up.py

@@ -1,5 +1,6 @@
 import os
 import datetime
+import time
 import pandas as pd
 from config import mongodb_config
 
@@ -77,10 +78,50 @@ def follow_up_handle():
     else:
         df_keep_info = pd.DataFrame()
 
+    def _parse_dt(yyyymmddhhmm):
+        try:
+            return datetime.datetime.strptime(str(yyyymmddhhmm), "%Y%m%d%H%M")
+        except Exception:
+            return None
+    
+    current_dt = _parse_dt(target_time)
+    prev_dt = None
+    hud_decrement = 1
+
+    # if not df_keep_info.empty and "last_predict_time" in df_keep_info.columns:
+    #     prev_candidates = (
+    #         df_keep_info["last_predict_time"].dropna().astype(str).tolist()
+    #     )
+    #     if prev_candidates:
+    #         prev_dt = _parse_dt(max(prev_candidates))
+
+    if prev_dt is None:
+        snapshot_times = []
+        for f in os.listdir(output_dir):
+            if (
+                f.startswith("keep_info_")
+                and f.endswith(".csv")
+                and f != f"keep_info_{target_time}.csv"
+            ):
+                ts = f.replace("keep_info_", "").replace(".csv", "")
+                dt = _parse_dt(ts)
+                if dt is not None:
+                    snapshot_times.append(dt)
+        if snapshot_times:
+            prev_dt = max(snapshot_times)
+
+    if current_dt is not None and prev_dt is not None:
+        delta_seconds = (current_dt - prev_dt).total_seconds()
+        if delta_seconds >= 0:
+            hud_decrement = max(0, int(delta_seconds // 3600))
+        else:
+            hud_decrement = 0
+
     # 初始化维护表
     if df_keep_info.empty:
         df_keep_info = df_last_predict_will_drop.copy()
         df_keep_info["keep_flag"] = 1
+        # df_keep_info["last_predict_time"] = target_time
         df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
         print(f"维护表已初始化: {keep_info_path} (rows={len(df_keep_info)})")
         df_keep_info.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
@@ -148,10 +189,10 @@ def follow_up_handle():
                     df_keep_info[c] = pd.NA
                 df_keep_info.loc[matched_idx, c] = df_latest_matched[c].values
 
-            # 重新标记 原来是1 -> 0  原来是0 -> 0  原来是-1 -> 1
+            # 重新标记 原来是1 -> 0  原来是0 -> 0  原来是2 -> 0, 原来是-1 -> 1
             old_flags = df_keep_info.loc[matched_idx, "keep_flag"]
             df_keep_info.loc[matched_idx, "keep_flag"] = old_flags.apply(
-                lambda x: 0 if x in (0, 1) else (1 if x == -1 else 1)
+                lambda x: 0 if x in (0, 1, 2) else (1 if x == -1 else 1)
             )
         
         # 符合场景三的索引 (在 df_keep_with_merge 中)
@@ -172,7 +213,8 @@ def follow_up_handle():
                         errors="coerce",
                     )
                     # hours_until_departure自动减1
-                    new_hud = hud - 1
+                    # new_hud = hud - 1
+                    new_hud = hud - hud_decrement
                     df_keep_info.loc[mask_need_observe, "hours_until_departure"] = new_hud
 
                     df_keep_only_keys = df_keep_info.loc[mask_keep_only, key_cols].copy()
@@ -200,35 +242,52 @@ def follow_up_handle():
                         df_keep_info.loc[mask_boundary_observe, "keep_flag"] = -1    # 默认删除标志
                         df_keep_info.loc[
                             mask_boundary_observe & new_hud_full.gt(4), "keep_flag"  # 如果达到边界且hours_until_departure大于4 则给保留标志
-                        ] = 0
+                        ] = 2
                     
                     pass
-                    # idx_eq13 = mask_need_observe.copy()
-                    # idx_eq13.loc[idx_eq13] = hud.eq(13)   # 原hours_until_departure等于13 
-
-                    # idx_gt13 = mask_need_observe.copy()
-                    # idx_gt13.loc[idx_gt13] = hud.gt(13)   # 原hours_until_departure大于13
-
-                    # idx_other = mask_need_observe & ~(idx_eq13 | idx_gt13)  # 原hours_until_departure小于13
-
-                    # idx_eq13_gt4 = idx_eq13 & new_hud.gt(4)
-                    # idx_eq13_eq4 = idx_eq13 & new_hud.eq(4)
-                    # # idx_eq13_lt4 = idx_eq13 & new_hud.lt(4)
-
-                    # df_keep_info.loc[idx_eq13_gt4, "keep_flag"] = 0
-                    # df_keep_info.loc[idx_eq13_eq4, "keep_flag"] = -1
-                    # # df_keep_info.loc[idx_eq13_lt4, "keep_flag"] = -2
-
-                    # df_keep_info.loc[idx_gt13, "keep_flag"] = -1
-
-                    # idx_other_gt4 = idx_other & new_hud.gt(4)
-                    # idx_other_eq4 = idx_other & new_hud.eq(4)
-                    # # idx_other_lt4 = idx_other & new_hud.lt(4)
-
-                    # df_keep_info.loc[idx_other_gt4, "keep_flag"] = 0
-                    # df_keep_info.loc[idx_other_eq4, "keep_flag"] = -1
-                    # # df_keep_info.loc[idx_other_lt4, "keep_flag"] = -2
-
+        
+        # 对于这些边界保持状态(keep_flag为2) 检查其是否在最新一次验价后的文件里存在, 如果不存在 则标记为-1
+        df_temp_2 = df_keep_info.loc[df_keep_info["keep_flag"] == 2, key_cols].copy()
+        if not df_temp_2.empty:
+            end_dir = "/home/node04/descending_cabin_files"
+            end_candidates = []
+            if os.path.isdir(end_dir):
+                for f in os.listdir(end_dir):
+                    if f.startswith("keep_info_end_") and f.endswith(".csv"):
+                        ts = f.replace("keep_info_end_", "").replace(".csv", "")
+                        if ts.isdigit():
+                            end_candidates.append((ts, f))   #(时间戳,文件名)
+            if end_candidates:
+                end_candidates.sort(key=lambda x: x[0])
+                end_last_path = os.path.join(end_dir, end_candidates[-1][1])  # 最新一次验价后的文件
+                try:
+                    df_end_last = pd.read_csv(end_last_path)
+                except Exception:
+                    df_end_last = pd.DataFrame()
+
+                if not df_end_last.empty and all(c in df_end_last.columns for c in key_cols):  # key_cols作为比对条件
+                    df_temp_2["_row_idx"] = df_temp_2.index
+                    df_end_keys = df_end_last[key_cols].drop_duplicates().copy()
+                    for c in key_cols:
+                        df_temp_2[c] = df_temp_2[c].astype(str)
+                        df_end_keys[c] = df_end_keys[c].astype(str)
+                    df_temp_2_with_merge = df_temp_2.merge(
+                        df_end_keys, on=key_cols, how="left", indicator=True
+                    )
+                    # 对于只在 df_temp_2 出现,而不在 df_end_keys 出现的索引,在 df_keep_info 中标记为-1
+                    idx_to_rm_2 = df_temp_2_with_merge.loc[
+                        df_temp_2_with_merge["_merge"] == "left_only", "_row_idx"
+                    ].tolist()
+                    if idx_to_rm_2:
+                        df_keep_info.loc[idx_to_rm_2, "keep_flag"] = -1
+
+        # 将长时间没更新的航班标记为-1
+        dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce")
+        dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce")
+        mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=12)
+        if mask_abnormal_time.any():
+            df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1
+        
         # 将 df_to_add 添加到 df_keep_info 之后
         add_rows = len(df_to_add) if "df_to_add" in locals() else 0
         if add_rows:
@@ -269,4 +328,7 @@ def follow_up_handle():
     pass
 
 if __name__ == "__main__":
-    follow_up_handle()
+    follow_up_handle()
+    time.sleep(10)
+    from descending_cabin_task import main as descending_cabin_task_main
+    descending_cabin_task_main()