Prechádzať zdrojové kódy

提交最近在生成规则时的问题整改

node04 3 týždňov pred
rodič
commit
6d654830b6
6 zmenil súbory, kde vykonal 102 pridanie a 44 odobranie
  1. 1 0
      .gitignore
  2. 6 5
      data_preprocess.py
  3. 9 8
      evaluate_validate_pnl.py
  4. 80 29
      follow_up.py
  5. 1 1
      main_tr_0.py
  6. 5 1
      result_validate_0.py

+ 1 - 0
.gitignore

@@ -11,4 +11,5 @@ predictions_0/
 predictions_2/
 predictions_4/
 validate/
+keep_0/
 __pycache__/

+ 6 - 5
data_preprocess.py

@@ -1120,11 +1120,11 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                         df_match_chk = df_match.copy()
                         dur_vals = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce')
                         df_match_chk = df_match_chk.loc[dur_vals.notna()].copy()
-                        df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 24].copy()
+                        df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 36].copy()
 
                         drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce')
                         df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy()
-                        df_match_chk = df_match_chk.loc[(drop_hud_vals.loc[drop_hud_vals.notna()] - float(hud_base)).abs() <= 12].copy()
+                        df_match_chk = df_match_chk.loc[(drop_hud_vals.loc[drop_hud_vals.notna()] - float(hud_base)).abs() <= 18].copy()
 
                         # seats_vals = pd.to_numeric(df_match_chk['high_price_seats_remaining_change_amount'], errors='coerce')
                         # df_match_chk = df_match_chk.loc[seats_vals.notna()].copy()
@@ -1329,7 +1329,7 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                             else:
                                 drop_prob = round(length_drop / (length_keep + length_drop), 2)
                                 # 依旧保持之前的降价判定,概率修改
-                                if drop_prob >= 0.45:
+                                if drop_prob >= 0.4:
                                     df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
                                     df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1'
                                 # 改判不降价,概率修改
@@ -1423,12 +1423,13 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
         na_position='last',
     ).reset_index(drop=True)
 
-    # 时间段过滤 过久没更新的(超过8小时)可能是已售完 不参与预测
+    # 时间段过滤 过滤掉异常时间(update_hour 早于 crawl_date)因为现在有实时验价, 不做8小时之内的过滤
     update_dt = pd.to_datetime(df_predict["update_hour"], errors="coerce")
     crawl_dt = pd.to_datetime(df_predict["crawl_date"], errors="coerce")
     dt_diff = update_dt - crawl_dt
     df_predict = df_predict.loc[
-        (dt_diff >= pd.Timedelta(0)) & (dt_diff <= pd.Timedelta(hours=8))
+        # (dt_diff >= pd.Timedelta(0)) & (dt_diff <= pd.Timedelta(hours=8))
+        (dt_diff >= pd.Timedelta(0))
     ].reset_index(drop=True)
     print("更新时间过滤")
 

+ 9 - 8
evaluate_validate_pnl.py

@@ -112,14 +112,15 @@ def evaluate_validate_pnl(csv_path, output_path=None):
 
 if __name__ == "__main__":
     # 临时添加参数用于调试
-    # import sys
-    # if len(sys.argv) == 1:
-    #     sys.argv = [
-    #         sys.argv[0],
-    #         # "/home/node04/yuzhou/jiangcang_vj/validate/node0205_zong/result_validate_node0205_zong_20260211100622.csv",  # 替换为实际路径
-    #         "/home/node04/yuzhou/jiangcang_vj/validate/node0210_zong/result_validate_node0210_zong_20260211101300.csv",  # 替换为实际路径
-    #         # "--output", "debug_output.csv"
-    #     ]
+    import sys
+    if len(sys.argv) == 1:
+        sys.argv = [
+            sys.argv[0],
+            # "/home/node04/yuzhou/jiangcang_vj/validate/node0205_zong/result_validate_node0205_zong_20260211100622.csv",  # 替换为实际路径
+            "/home/node04/yuzhou/jiangcang_vj/validate/node0211_zong/result_validate_node0211_zong_20260302104535.csv",  # 替换为实际路径
+            # "/home/node04/yuzhou/jiangcang_vj/validate/node0224_zong/result_validate_node0224_zong_20260227100242.csv",  # 替换为实际路径
+            # "--output", "debug_output.csv"
+        ]
 
     parser = argparse.ArgumentParser(description="验证结果的准确率与盈亏比统计")
     parser.add_argument("csv_path", help="result_validate_*.csv 路径")

+ 80 - 29
follow_up.py

@@ -7,6 +7,10 @@ from config import mongodb_config
 def follow_up_handle():
     '''后续处理'''
     object_dir = "./predictions_0"
+    output_dir = "./keep_0"
+
+    # 创建输出目录
+    os.makedirs(output_dir, exist_ok=True)
 
     # 检查目录是否存在
     if not os.path.exists(object_dir):
@@ -26,32 +30,42 @@ def follow_up_handle():
     csv_files.sort()
     
     # 调试分支
-    # target_time = '202602251300'
+    # target_time = "202603011600"
     # matching_files = [f for f in csv_files if target_time in f]
     # if matching_files:
     #     last_csv_file = matching_files[0]
     #     print(f"指定时间的文件: {last_csv_file}")
     # else:
     #     print(f"未找到时间 {target_time} 的预测文件")
+    #     return
 
     # 正式分支
     last_csv_file = csv_files[-1]   # 只看最新预测的文件
     print(f"最新预测文件: {last_csv_file}")
+    if last_csv_file.startswith("future_predictions_") and last_csv_file.endswith(".csv"):
+        target_time = last_csv_file.replace("future_predictions_", "").replace(".csv", "")
+    else:
+        target_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
 
     # 读取最新预测文件
     last_csv_path = os.path.join(object_dir, last_csv_file)
     df_last_predict = pd.read_csv(last_csv_path)
 
     df_last_predict_will_drop = df_last_predict[df_last_predict["will_price_drop"] == 1].reset_index(drop=True)
-    print(f"最新预测文件中,预测降价的航班有 {len(df_last_predict_will_drop)} 条")
-
-    # 建一张 维护表 keep_info.csv
-    keep_info_path = os.path.join(object_dir, "keep_info.csv")
+    df_last_predict_not_drop = df_last_predict[df_last_predict["will_price_drop"] == 0].reset_index(drop=True)
+    print(f"最新预测文件中,预测降价的航班有 {len(df_last_predict_will_drop)} 条,预测不降价的航班有 {len(df_last_predict_not_drop)} 条")
+    
+    # 建一张 维护表 keep_info.csv  附加一个维护表快照 keep_info_{target_time}.csv
+    keep_info_path = os.path.join(output_dir, "keep_info.csv")
+    keep_info_snapshot_path = os.path.join(output_dir, f"keep_info_{target_time}.csv")
     key_cols = ["city_pair", "flight_day", "flight_number_1", "flight_number_2"]
 
     df_last_predict_will_drop = df_last_predict_will_drop.drop_duplicates(
         subset=key_cols, keep="last"
     ).reset_index(drop=True)
+    df_last_predict_not_drop = df_last_predict_not_drop.drop_duplicates(
+        subset=key_cols, keep="last"
+    ).reset_index(drop=True)
 
     # 读取维护表
     if os.path.exists(keep_info_path):
@@ -69,6 +83,8 @@ def follow_up_handle():
         df_keep_info["keep_flag"] = 1
         df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
         print(f"维护表已初始化: {keep_info_path} (rows={len(df_keep_info)})")
+        df_keep_info.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
+        print(f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info)})")
     # 已存在维护表
     else:
         if "keep_flag" not in df_keep_info.columns:
@@ -87,6 +103,7 @@ def follow_up_handle():
         
         for c in key_cols:
             df_last_predict_will_drop[c] = df_last_predict_will_drop[c].astype(str)
+            df_last_predict_not_drop[c] = df_last_predict_not_drop[c].astype(str)
             df_keep_info[c] = df_keep_info[c].astype(str)
 
         df_keep_info = df_keep_info.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True)
@@ -139,10 +156,10 @@ def follow_up_handle():
         
         # 符合场景三的索引 (在 df_keep_with_merge 中)
         if keep_only_idx:
-            # 如果 df_keep_info 的 keep_flag 为-1,此时标记为-2
             mask_keep_only = df_keep_info.index.isin(keep_only_idx)  # 布尔索引序列
-            mask_to_remove = mask_keep_only & (df_keep_info["keep_flag"] == -1)
-            df_keep_info.loc[mask_to_remove, "keep_flag"] = -2
+            # 如果 df_keep_info 的 keep_flag 为-1,此时标记为-2
+            # mask_to_remove = mask_keep_only & (df_keep_info["keep_flag"] == -1)
+            # df_keep_info.loc[mask_to_remove, "keep_flag"] = -2
 
             # 如果 df_keep_info 的 keep_flag 大于等于0
             mask_need_observe = mask_keep_only & (df_keep_info["keep_flag"] >= 0)  # 布尔索引序列
@@ -158,43 +175,77 @@ def follow_up_handle():
                     new_hud = hud - 1
                     df_keep_info.loc[mask_need_observe, "hours_until_departure"] = new_hud
 
-                    idx_eq13 = mask_need_observe.copy()
-                    idx_eq13.loc[idx_eq13] = hud.eq(13)   # 原hours_until_departure等于13 
+                    df_keep_only_keys = df_keep_info.loc[mask_keep_only, key_cols].copy()
+                    df_keep_only_keys["_row_idx"] = df_keep_only_keys.index
+                    # 检查 df_keep_only_keys 是否在 df_last_predict_not_drop 中
+                    df_keep_only_keys = df_keep_only_keys.merge(
+                        df_last_predict_not_drop[key_cols].drop_duplicates(),
+                        on=key_cols,
+                        how="left",
+                        indicator=True,
+                    )
+                    idx_in_not_drop = df_keep_only_keys.loc[
+                        df_keep_only_keys["_merge"] == "both", "_row_idx"
+                    ].tolist()
+                    mask_in_not_drop = df_keep_info.index.isin(idx_in_not_drop)     # 在 df_last_predict_not_drop 中出现 只是will_price_drop为0 未达边界
+                    mask_not_drop_observe = mask_need_observe & mask_in_not_drop    # 判断为不降价的布尔索引数组
+                    mask_boundary_observe = mask_need_observe & ~mask_in_not_drop   # 判断为到达边界的布尔索引数组
+
+                    df_keep_info.loc[mask_not_drop_observe, "keep_flag"] = -1       # 删除标志
+
+                    if mask_boundary_observe.any():
+                        new_hud_full = pd.to_numeric(
+                            df_keep_info["hours_until_departure"], errors="coerce"
+                        )
+                        df_keep_info.loc[mask_boundary_observe, "keep_flag"] = -1    # 默认删除标志
+                        df_keep_info.loc[
+                            mask_boundary_observe & new_hud_full.gt(4), "keep_flag"  # 如果达到边界且hours_until_departure大于4 则给保留标志
+                        ] = 0
+                    
+                    pass
+                    # idx_eq13 = mask_need_observe.copy()
+                    # idx_eq13.loc[idx_eq13] = hud.eq(13)   # 原hours_until_departure等于13 
 
-                    idx_gt13 = mask_need_observe.copy()
-                    idx_gt13.loc[idx_gt13] = hud.gt(13)   # 原hours_until_departure大于13
+                    # idx_gt13 = mask_need_observe.copy()
+                    # idx_gt13.loc[idx_gt13] = hud.gt(13)   # 原hours_until_departure大于13
 
-                    idx_other = mask_need_observe & ~(idx_eq13 | idx_gt13)  # 原hours_until_departure小于13
+                    # idx_other = mask_need_observe & ~(idx_eq13 | idx_gt13)  # 原hours_until_departure小于13
 
-                    idx_eq13_gt4 = idx_eq13 & new_hud.gt(4)
-                    idx_eq13_eq4 = idx_eq13 & new_hud.eq(4)
-                    idx_eq13_lt4 = idx_eq13 & new_hud.lt(4)
+                    # idx_eq13_gt4 = idx_eq13 & new_hud.gt(4)
+                    # idx_eq13_eq4 = idx_eq13 & new_hud.eq(4)
+                    # # idx_eq13_lt4 = idx_eq13 & new_hud.lt(4)
 
-                    df_keep_info.loc[idx_eq13_gt4, "keep_flag"] = 0
-                    df_keep_info.loc[idx_eq13_eq4, "keep_flag"] = -1
-                    df_keep_info.loc[idx_eq13_lt4, "keep_flag"] = -2
+                    # df_keep_info.loc[idx_eq13_gt4, "keep_flag"] = 0
+                    # df_keep_info.loc[idx_eq13_eq4, "keep_flag"] = -1
+                    # # df_keep_info.loc[idx_eq13_lt4, "keep_flag"] = -2
 
-                    df_keep_info.loc[idx_gt13, "keep_flag"] = -1
+                    # df_keep_info.loc[idx_gt13, "keep_flag"] = -1
 
-                    idx_other_gt4 = idx_other & new_hud.gt(4)
-                    idx_other_eq4 = idx_other & new_hud.eq(4)
-                    idx_other_lt4 = idx_other & new_hud.lt(4)
+                    # idx_other_gt4 = idx_other & new_hud.gt(4)
+                    # idx_other_eq4 = idx_other & new_hud.eq(4)
+                    # # idx_other_lt4 = idx_other & new_hud.lt(4)
 
-                    df_keep_info.loc[idx_other_gt4, "keep_flag"] = 0
-                    df_keep_info.loc[idx_other_eq4, "keep_flag"] = -1
-                    df_keep_info.loc[idx_other_lt4, "keep_flag"] = -2
+                    # df_keep_info.loc[idx_other_gt4, "keep_flag"] = 0
+                    # df_keep_info.loc[idx_other_eq4, "keep_flag"] = -1
+                    # # df_keep_info.loc[idx_other_lt4, "keep_flag"] = -2
 
         # 将 df_to_add 添加到 df_keep_info 之后
         add_rows = len(df_to_add) if "df_to_add" in locals() else 0
         if add_rows:
             df_keep_info = pd.concat([df_keep_info, df_to_add], ignore_index=True)
         
-        # 移除 keep_flag 为 -2 的行
+        df_keep_info_snapshot = df_keep_info.copy()
+        df_keep_info_snapshot.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
+        print(
+            f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info_snapshot)})"
+        )
+
+        # 移除 keep_flag 为 -1 的行
         before_rm = len(df_keep_info)
-        df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -2].reset_index(drop=True)
+        df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True)
         rm_rows = before_rm - len(df_keep_info)
 
-        # 保存更新后的 df_keep_info 到csv文件
+        # 保存更新后的 df_keep_info 到维护表csv文件
         df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
         print(
             f"维护表已更新: {keep_info_path} (rows={len(df_keep_info)} add={add_rows} rm={rm_rows})"

+ 1 - 1
main_tr_0.py

@@ -49,7 +49,7 @@ def start_train():
     # date_end = datetime.today().strftime("%Y-%m-%d")
     date_end = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
     # date_begin = (datetime.today() - timedelta(days=32)).strftime("%Y-%m-%d")
-    date_begin = "2025-12-01"   # 2025-12-01  2026-02-05
+    date_begin = "2026-02-24"   # 2025-12-01 2026-02-11 2026-02-24 2026-03-02
 
     print(f"训练时间范围: {date_begin} 到 {date_end}")
 

+ 5 - 1
result_validate_0.py

@@ -588,7 +588,11 @@ if __name__ == "__main__":
         # node = "node0205"
         # validate_process_zong(node, True, "202602061000", "202602091000")  # 有条件汇总
         node = "node0211"
-        validate_process_zong(node, True, "202602111100", None)  # 有条件汇总
+        validate_process_zong(node, True, "202602111100", None)    # 202602111100 -> 202602161000  202602161100 -> 202602211000
+        # node = "node0224"
+        # validate_process_zong(node, True, "202602241600", None)  # 202602241600 -> 202602271400  202602271500  202602281700
+        # node = "node0302"
+        # validate_process_zong(node, True, "202603021500", None)
     # 1 自动验证
     else:
         node = "node0127"