Ver Fonte

提交盈亏比验证, 调整预测判定

node04 há 2 dias atrás
pai
commit
57d60c316a
4 ficheiros alterados com 265 adições e 8 exclusões
  1. 3 3
      data_preprocess.py
  2. 128 0
      evaluate_validate_pnl.py
  3. 1 1
      main_tr_0.py
  4. 133 4
      result_validate_0.py

+ 3 - 3
data_preprocess.py

@@ -1108,7 +1108,7 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
 
                 df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
-                df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 5.0)].copy()
+                df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy()
 
                 # 历史上出现的极近似的增长幅度后的降价场景
                 if not df_match.empty:
@@ -1120,7 +1120,7 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                         df_match_chk = df_match.copy()
                         dur_vals = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce')
                         df_match_chk = df_match_chk.loc[dur_vals.notna()].copy()
-                        df_match_chk = df_match_chk.loc[dur_vals.loc[dur_vals.notna()] - 12 <= float(dur_base)].copy()
+                        df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 12].copy()
 
                         drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce')
                         df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy()
@@ -1290,7 +1290,7 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 df_keep_gap_1['price_abs_gap'] = df_keep_gap_1['price_gap'].abs()
 
                 df_keep_gap_1 = df_keep_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
-                df_match_1 = df_keep_gap_1.loc[(df_keep_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_keep_gap_1['price_abs_gap'] <= 5.0)].copy()
+                df_match_1 = df_keep_gap_1.loc[(df_keep_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_keep_gap_1['price_abs_gap'] <= 10.0)].copy()
 
                 # 历史上出现过近似变化幅度后保持低价场景
                 if not df_match_1.empty:

+ 128 - 0
evaluate_validate_pnl.py

@@ -0,0 +1,128 @@
+import argparse
+import os
+import pandas as pd
+
+
+def _safe_div(numer, denom):
+    if denom == 0:
+        return pd.NA
+    return round(numer / denom, 4)
+
+
+def evaluate_validate_pnl(csv_path, output_path=None):
+    df = pd.read_csv(csv_path)
+    if df.empty:
+        print("输入文件为空")
+        return
+
+    if "will_price_drop" not in df.columns:
+        print("缺少 will_price_drop 字段")
+        return
+
+    if "drop_flag_window" not in df.columns:
+        if "drop_flag" in df.columns:
+            print("缺少 drop_flag_window,使用 drop_flag 作为替代口径")
+            df["drop_flag_window"] = df["drop_flag"]
+        else:
+            print("缺少 drop_flag_window 字段,请先用更新后的验证脚本生成")
+            return
+
+    df_signal = df[df["will_price_drop"] == 1].copy()
+    if df_signal.empty:
+        print("信号样本为空 (will_price_drop==1)")
+        return
+
+    df_signal["drop_flag_window"] = pd.to_numeric(
+        df_signal["drop_flag_window"], errors="coerce"
+    ).fillna(0).astype(int)
+
+    df_signal["pnl"] = pd.to_numeric(df_signal.get("pnl"), errors="coerce")
+    df_signal["pnl_pct"] = pd.to_numeric(df_signal.get("pnl_pct"), errors="coerce")
+
+    valid_pnl_mask = df_signal["pnl"].notna()
+
+    y_true = df_signal["drop_flag_window"].astype(int)
+    y_pred = df_signal["will_price_drop"].astype(int)
+    tp = int(((y_true == 1) & (y_pred == 1)).sum())   # 正阳
+    tn = int(((y_true == 0) & (y_pred == 0)).sum())   # 正阴
+    fp = int(((y_true == 0) & (y_pred == 1)).sum())   # 假阳
+    fn = int(((y_true == 1) & (y_pred == 0)).sum())   # 假阴
+
+    accuracy = _safe_div(tp + tn, tp + tn + fp + fn)
+    precision = _safe_div(tp, tp + fp)
+    recall = _safe_div(tp, tp + fn)
+    f1 = (
+        pd.NA
+        if precision is pd.NA or recall is pd.NA or (precision + recall) == 0
+        else round(2 * precision * recall / (precision + recall), 4)
+    )
+
+    pnl_series = df_signal.loc[valid_pnl_mask, "pnl"]
+    pnl_pct_series = df_signal.loc[valid_pnl_mask, "pnl_pct"]
+    win_series = pnl_series[pnl_series > 0]    # 盈利单
+    loss_series = pnl_series[pnl_series < 0]   # 亏损单
+    flat_series = pnl_series[pnl_series == 0]  # 平价单
+
+    win_rate = _safe_div(len(win_series), len(pnl_series))  # 盈利单数占比
+    avg_win = round(win_series.mean(), 4) if not win_series.empty else pd.NA  # 盈利单平均每单盈利
+    avg_loss = round(abs(loss_series.mean()), 4) if not loss_series.empty else pd.NA  # 亏损单平均每单亏损
+    pl_ratio_avg = (
+        pd.NA if avg_loss is pd.NA or avg_loss == 0 else round(avg_win / avg_loss, 4)
+    )  # 平均每单盈亏比
+
+    sum_win = round(win_series.sum(), 4) if not win_series.empty else 0.0  # 盈利单总盈利
+    sum_loss = round(abs(loss_series.sum()), 4) if not loss_series.empty else 0.0  # 亏损单总亏损
+    pl_ratio_sum = pd.NA if sum_loss == 0 else round(sum_win / sum_loss, 4)  # 盈利单总盈利与亏损单总亏损的盈亏比   
+
+    summary = {
+        "rows_total": int(len(df)),
+        "rows_signal": int(len(df_signal)),
+        "rows_with_pnl": int(valid_pnl_mask.sum()),
+        "rows_pnl_missing": int((~valid_pnl_mask).sum()),
+        "tp": tp,
+        "fp": fp,
+        "tn": tn,
+        "fn": fn,
+        "accuracy": accuracy,
+        "precision": precision,
+        "recall": recall,
+        "f1": f1,
+        "win_rate": win_rate,
+        "avg_win": avg_win,
+        "avg_loss": avg_loss,
+        "profit_loss_ratio_avg": pl_ratio_avg,
+        "profit_loss_ratio_sum": pl_ratio_sum,
+        "pnl_sum": round(pnl_series.sum(), 4) if not pnl_series.empty else pd.NA,   # 汇总盈亏
+        "pnl_pct_mean": round(pnl_pct_series.mean(), 4) if not pnl_pct_series.empty else pd.NA,  # 汇总盈亏百分比
+        "wins": int(len(win_series)),     # 盈利单数
+        "losses": int(len(loss_series)),  # 亏损单数
+        "flats": int(len(flat_series)),   # 平价单数
+    }
+
+    summary_df = pd.DataFrame([summary])
+    print(summary_df.to_string(index=False))
+
+    if output_path is None:
+        base, _ = os.path.splitext(csv_path)  # 与验证文件在同一目录
+        output_path = f"{base}_summary.csv"
+
+    summary_df.to_csv(output_path, index=False, encoding="utf-8-sig")
+    print(f"盈亏汇总已保存: {output_path}")
+
+
+if __name__ == "__main__":
+    # 临时添加参数用于调试
+    # import sys
+    # if len(sys.argv) == 1:
+    #     sys.argv = [
+    #         sys.argv[0],
+    #         "/home/node04/yuzhou/jiangcang_vj/validate/node0205_zong/result_validate_node0205_zong_20260209155011.csv",  # 替换为实际路径
+    #         # "--output", "debug_output.csv"
+    #     ]
+
+    parser = argparse.ArgumentParser(description="验证结果的准确率与盈亏比统计")
+    parser.add_argument("csv_path", help="result_validate_*.csv 路径")
+    parser.add_argument("--output", default=None, help="汇总输出 CSV 路径")
+    args = parser.parse_args()
+
+    evaluate_validate_pnl(args.csv_path, args.output)

+ 1 - 1
main_tr_0.py

@@ -49,7 +49,7 @@ def start_train():
     # date_end = datetime.today().strftime("%Y-%m-%d")
     date_end = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
     # date_begin = (datetime.today() - timedelta(days=32)).strftime("%Y-%m-%d")
-    date_begin = "2025-12-01"   # 2025-12-01  2026-01-27  2026-02-06
+    date_begin = "2025-12-01"   # 2025-12-01  2026-02-05
 
     print(f"训练时间范围: {date_begin} 到 {date_end}")
 

+ 133 - 4
result_validate_0.py

@@ -27,6 +27,29 @@ def _validate_predict_df(df_predict):
             update_dt
         ).strftime('%Y-%m-%d %H:%M:%S')
         df_val= validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
+        
+        entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
+        crawl_dt = pd.to_datetime(row.get('crawl_date'), errors='coerce')
+        batch_dt = pd.to_datetime(row.get('batch_time'), format="%Y%m%d%H%M", errors='coerce')
+        wait_start_dt = pd.NaT
+        wait_end_dt = pd.NaT
+        dep_hour_dt = pd.to_datetime(row.get('from_time'), errors='coerce')
+        if pd.notna(batch_dt):
+            wait_start_dt = batch_dt.floor('h')
+        if pd.notna(crawl_dt):
+            crawl_floor = crawl_dt.floor('h')
+            if pd.isna(wait_start_dt):
+                wait_start_dt = crawl_floor
+            else:
+                wait_start_dt = max(wait_start_dt, crawl_floor)   # 等待近端接近预测批次时间
+        if pd.notna(wait_start_dt):
+            wait_end_dt = wait_start_dt + pd.Timedelta(hours=48)  # 等待窗口48小时
+        if pd.notna(dep_hour_dt):
+            dep_hour_dt = dep_hour_dt.floor('h')
+            cutoff_dt = dep_hour_dt - pd.Timedelta(hours=4)
+            if pd.notna(wait_end_dt):
+                wait_end_dt = min(wait_end_dt, cutoff_dt)  # 等待远端不能越过起飞前4小时
+        
         # 有可能在当前验证时刻,数据库里没有在valid_begin_hour之后的数据
         if not df_val.empty:
             df_val_f = fill_hourly_crawl_date(df_val, rear_fill=2)
@@ -42,12 +65,96 @@ def _validate_predict_df(df_predict):
                 last_update_hour = pd.NA
                 list_change_price = []
                 list_change_hours = []
+                drop_flag_window = 0
+                first_lower_price = pd.NA
+                first_lower_update_hour = pd.NA
+                boundary_final_price = pd.NA
+                boundary_final_update_hour = pd.NA
+                trigger_type = pd.NA
+                trigger_price = pd.NA
+                trigger_update_hour = pd.NA
+                pnl = pd.NA
+                pnl_pct = pd.NA
             else:
                 # 有效数据的最后一行
                 last_row = df_val_f.iloc[-1]
                 last_hours_util = last_row['hours_until_departure']
                 last_update_hour = last_row['update_hour']
 
+                df_val_f['update_hour'] = pd.to_datetime(df_val_f['update_hour'], errors='coerce')
+
+                # 使用 batch_time 对齐的实际价格作为 entry_price
+                if pd.notna(batch_dt):
+                    df_entry = df_val_f[df_val_f['update_hour'] <= batch_dt].copy()
+                    if not df_entry.empty:
+                        entry_price = df_entry.iloc[-1]['adult_total_price']
+
+                df_window = df_val_f
+                if pd.notna(wait_start_dt) and pd.notna(wait_end_dt):
+                    df_window = df_val_f[
+                        (df_val_f['update_hour'] >= wait_start_dt) &
+                        (df_val_f['update_hour'] <= wait_end_dt)
+                    ].copy()   # 构建观测窗口
+                else:
+                    df_window = df_val_f.iloc[0:0].copy()   # 空切片
+
+                if not df_window.empty:
+                    df_window = df_window.sort_values('update_hour')
+                    df_window_price_changes = df_window.loc[
+                        df_window["adult_total_price"].shift() != df_window["adult_total_price"]
+                    ].copy()
+                    df_window_price_changes['change_amount'] = (
+                        df_window_price_changes['adult_total_price'].diff().fillna(0)
+                    )
+                    df_first_window_negative = df_window_price_changes[
+                        df_window_price_changes['change_amount'] < -5
+                    ].head(1)
+                    drop_flag_window = 1 if not df_first_window_negative.empty else 0   # 在观测窗口中的发生降价判定
+                else:
+                    drop_flag_window = 0
+
+                first_lower_price = pd.NA
+                first_lower_update_hour = pd.NA
+                if not df_window.empty and pd.notna(entry_price) and pd.notna(wait_start_dt):
+                    df_lower = df_window[
+                        (df_window['update_hour'] > wait_start_dt) &
+                        (df_window['adult_total_price'] <= entry_price - 5)
+                    ].head(1)  
+                    if not df_lower.empty:  # 首次出现低于 entry_price - 5 的价格与时间
+                        first_lower_price = df_lower['adult_total_price'].iloc[0].round(2)
+                        first_lower_update_hour = df_lower['update_hour'].iloc[0]
+
+                boundary_final_price = pd.NA
+                boundary_final_update_hour = pd.NA
+                if not df_window.empty:  # 观测窗口远端边界的价格与时间
+                    boundary_row = df_window.iloc[-1]
+                    boundary_final_price = boundary_row['adult_total_price']
+                    boundary_final_update_hour = boundary_row['update_hour']
+
+                trigger_type = pd.NA
+                trigger_price = pd.NA
+                trigger_update_hour = pd.NA
+                if pd.notna(first_lower_price):
+                    trigger_type = "first_lower"   # 发生降价 
+                    trigger_price = first_lower_price
+                    trigger_update_hour = first_lower_update_hour
+                elif pd.notna(boundary_final_price):
+                    trigger_type = "boundary"      # 到达边界
+                    trigger_price = boundary_final_price
+                    trigger_update_hour = boundary_final_update_hour
+                else:
+                    trigger_type = "no_data"
+
+                if pd.notna(entry_price) and pd.notna(trigger_price):
+                    pnl = round(float(entry_price - trigger_price), 2)   # 盈利(亏损)额度,基于entry_price
+                    if entry_price != 0:
+                        pnl_pct = round(float(pnl) / float(entry_price) * 100, 2)  # 盈利(亏损)百分比,基于entry_price
+                    else:
+                        pnl_pct = pd.NA
+                else:
+                    pnl = pd.NA
+                    pnl_pct = pd.NA
+
                 # 价格变化过滤
                 df_price_changes = df_val_f.loc[
                     df_val_f["adult_total_price"].shift() != df_val_f["adult_total_price"]
@@ -86,6 +193,16 @@ def _validate_predict_df(df_predict):
             last_update_hour = pd.NA
             list_change_price = []
             list_change_hours = []
+            drop_flag_window = 0
+            first_lower_price = pd.NA
+            first_lower_update_hour = pd.NA
+            boundary_final_price = pd.NA
+            boundary_final_update_hour = pd.NA
+            trigger_type = pd.NA
+            trigger_price = pd.NA
+            trigger_update_hour = pd.NA
+            pnl = pd.NA
+            pnl_pct = pd.NA
         
         safe_sep = "; "
 
@@ -98,6 +215,18 @@ def _validate_predict_df(df_predict):
         df_predict.at[idx, 'first_drop_hours_until_departure'] = first_drop_hours_until_departure
         df_predict.at[idx, 'first_drop_update_hour'] = first_drop_update_hour
         df_predict.at[idx, 'drop_flag'] = drop_flag
+        df_predict.at[idx, 'wait_start_hour'] = wait_start_dt
+        df_predict.at[idx, 'wait_end_hour'] = wait_end_dt
+        df_predict.at[idx, 'drop_flag_window'] = drop_flag_window
+        df_predict.at[idx, 'first_lower_price'] = first_lower_price
+        df_predict.at[idx, 'first_lower_update_hour'] = first_lower_update_hour
+        df_predict.at[idx, 'boundary_final_price'] = boundary_final_price
+        df_predict.at[idx, 'boundary_final_update_hour'] = boundary_final_update_hour
+        df_predict.at[idx, 'trigger_type'] = trigger_type
+        df_predict.at[idx, 'trigger_price'] = trigger_price
+        df_predict.at[idx, 'trigger_update_hour'] = trigger_update_hour
+        df_predict.at[idx, 'pnl'] = pnl
+        df_predict.at[idx, 'pnl_pct'] = pnl_pct
 
         count += 1
         if count % 5 == 0:
@@ -452,12 +581,12 @@ if __name__ == "__main__":
         # validate_process(node, interval_hours, pred_time_str)
         # node = "node0127"
         # validate_process_zong(node)  # 无条件汇总
-        node = "node0127"
-        validate_process_zong(node, True, None, "202602051400")   # 有条件汇总
+        # node = "node0127"
+        # validate_process_zong(node, True, None, "202602051400")   # 有条件汇总
         # node = "node0203"
         # validate_process_zong(node, True, "202602041100", "202602051400")  # 有条件汇总
-        # node = "node0205"
-        # validate_process_zong(node, True, "202602061000")  # 有条件汇总
+        node = "node0205"
+        validate_process_zong(node, True, "202602061000", "202602091000")  # 有条件汇总
     # 1 自动验证
     else:
         node = "node0127"