Ver Fonte

修改验证过程,加入汇总验证

node04 há 1 semana atrás
pai
commit
c53a169012
1 ficheiros alterados com 202 adições e 12 exclusões
  1. 202 12
      result_validate_0.py

+ 202 - 12
result_validate_0.py

@@ -34,8 +34,10 @@ def _validate_predict_df(df_predict):
             # df_val_f = df_val_f[df_val_f['update_hour'] <= valid_end_dt]
             if df_val_f.empty:
                 drop_flag = 0
-                first_drop_amount = pd.NA
-                first_drop_hours = pd.NA
+                # first_drop_amount = pd.NA
+                first_drop_price = pd.NA
+                first_drop_hours_until_departure = pd.NA
+                first_drop_update_hour = pd.NA
                 last_hours_util = pd.NA
                 last_update_hour = pd.NA
                 list_change_price = []
@@ -60,20 +62,26 @@ def _validate_predict_df(df_predict):
                 # 提取所需的值
                 if not first_negative_change.empty:
                     drop_flag = 1
-                    first_drop_amount = first_negative_change['change_amount'].iloc[0].round(2)
-                    first_drop_hours = first_negative_change['hours_until_departure'].iloc[0]
+                    # first_drop_amount = first_negative_change['change_amount'].iloc[0].round(2)
+                    first_drop_price = first_negative_change['adult_total_price'].iloc[0].round(2)
+                    first_drop_hours_until_departure = first_negative_change['hours_until_departure'].iloc[0]
+                    first_drop_update_hour = first_negative_change['update_hour'].iloc[0]
                 else:
                     drop_flag = 0
-                    first_drop_amount = pd.NA
-                    first_drop_hours = pd.NA
+                    # first_drop_amount = pd.NA
+                    first_drop_price = pd.NA
+                    first_drop_hours_until_departure = pd.NA
+                    first_drop_update_hour = pd.NA
 
                 list_change_price = df_price_changes['adult_total_price'].tolist()
                 list_change_hours = df_price_changes['hours_until_departure'].tolist()
 
         else:
             drop_flag = 0
-            first_drop_amount = pd.NA
-            first_drop_hours = pd.NA
+            # first_drop_amount = pd.NA
+            first_drop_price = pd.NA
+            first_drop_hours_until_departure = pd.NA
+            first_drop_update_hour = pd.NA
             last_hours_util = pd.NA
             last_update_hour = pd.NA
             list_change_price = []
@@ -85,8 +93,10 @@ def _validate_predict_df(df_predict):
         df_predict.at[idx, 'change_hours'] = safe_sep.join(map(str, list_change_hours))
         df_predict.at[idx, 'last_hours_util'] = last_hours_util
         df_predict.at[idx, 'last_update_hour'] = last_update_hour
-        df_predict.at[idx, 'first_drop_amount'] = first_drop_amount * -1  # 负数转正数
-        df_predict.at[idx, 'first_drop_hours'] = first_drop_hours
+        # df_predict.at[idx, 'first_drop_amount'] = first_drop_amount * -1  # 负数转正数
+        df_predict.at[idx, 'first_drop_price'] = first_drop_price
+        df_predict.at[idx, 'first_drop_hours_until_departure'] = first_drop_hours_until_departure
+        df_predict.at[idx, 'first_drop_update_hour'] = first_drop_update_hour
         df_predict.at[idx, 'drop_flag'] = drop_flag
 
         count += 1
@@ -216,6 +226,184 @@ def validate_process_auto(node, interval_hours):
     print()
 
 
+def validate_process_zong(node):
+    object_dir = "./predictions_0"
+
+    output_dir = f"./validate/{node}_zong"
+    os.makedirs(output_dir, exist_ok=True)
+
+    # 检查目录是否存在
+    if not os.path.exists(object_dir):
+        print(f"目录不存在: {object_dir}")
+        return
+                    
+    # 获取所有以 future_predictions_ 开头的 CSV 文件
+    csv_files = []
+    for file in os.listdir(object_dir):
+        if file.startswith("future_predictions_") and file.endswith(".csv"):
+            csv_files.append(file)
+    
+    if not csv_files:
+        print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
+        return
+
+    csv_files.sort()
+    list_df_will_drop = []    
+    
+    for csv_file in csv_files:
+        csv_path = os.path.join(object_dir, csv_file)
+        try:
+            df_predict = pd.read_csv(csv_path)
+        except Exception as e:
+            print(f"read {csv_path} error: {str(e)}")
+            df_predict = pd.DataFrame()
+        
+        if df_predict.empty:
+            print(f"预测数据为空: {csv_file}")
+            continue
+
+        if "will_price_drop" not in df_predict.columns:
+            print(f"缺少 will_price_drop 字段,跳过: {csv_file}")
+            continue
+
+        batch_time_str = (
+            csv_file.replace("future_predictions_", "").replace(".csv", "")
+        )
+
+        df_predict_will_drop = df_predict[df_predict["will_price_drop"] == 1].copy()
+
+        if df_predict_will_drop.empty:
+            continue
+        
+        # df_predict_will_drop["batch_file"] = csv_file
+        df_predict_will_drop["batch_time"] = batch_time_str
+        list_df_will_drop.append(df_predict_will_drop)
+
+        del df_predict
+    
+    if not list_df_will_drop:
+        print("所有批次的 will_drop 都为空")
+        return
+
+    df_predict_will_drop_all = pd.concat(list_df_will_drop, ignore_index=True)
+
+    del list_df_will_drop
+    
+    before_rows = len(df_predict_will_drop_all)
+    group_keys = ["city_pair", "flight_number_1", "flight_number_2", "flight_day"]
+    df_predict_will_drop_all["batch_dt"] = pd.to_datetime(
+        df_predict_will_drop_all["batch_time"],
+        format="%Y%m%d%H%M",
+        errors="coerce",
+    )
+    diff_minutes = (
+        df_predict_will_drop_all["batch_dt"].dropna().sort_values().drop_duplicates().diff()
+        .dt.total_seconds()
+        .div(60)
+        .dropna()
+    )
+    expected_step_minutes = (
+        int(diff_minutes.value_counts().idxmax()) if not diff_minutes.empty else 60
+    )
+
+    df_predict_will_drop_all.sort_values(
+        by=group_keys + ["batch_dt"],
+        inplace=True,
+        ignore_index=True,
+        na_position="last",
+    )
+
+    df_predict_will_drop_all["prev_batch_dt"] = df_predict_will_drop_all.groupby(group_keys)[
+        "batch_dt"
+    ].shift(1)
+    df_predict_will_drop_all["gap_minutes"] = (
+        (df_predict_will_drop_all["batch_dt"] - df_predict_will_drop_all["prev_batch_dt"])
+        .dt.total_seconds()
+        .div(60)
+    )
+
+    df_predict_will_drop_all["is_new_segment"] = (
+        df_predict_will_drop_all["prev_batch_dt"].isna()
+        | df_predict_will_drop_all["batch_dt"].isna()
+        | (df_predict_will_drop_all["gap_minutes"] != expected_step_minutes)
+    )
+    df_predict_will_drop_all["segment_id"] = df_predict_will_drop_all.groupby(group_keys)[
+        "is_new_segment"
+    ].cumsum()
+
+    df_segment_last = df_predict_will_drop_all.groupby(
+        group_keys + ["segment_id"], as_index=False
+    ).agg(last_hours_until_departure=("hours_until_departure", "last"))
+
+    df_predict_will_drop_filter = df_predict_will_drop_all.drop_duplicates(
+        subset=group_keys + ["segment_id"], keep="first"
+    ).merge(
+        df_segment_last,
+        on=group_keys + ["segment_id"],
+        how="left",
+    )
+
+    df_predict_will_drop_filter = (
+        df_predict_will_drop_filter.drop(
+            columns=[
+                "batch_dt",
+                "prev_batch_dt",
+                "gap_minutes",
+                "is_new_segment",
+                "segment_id",
+            ]
+        )
+        .reset_index(drop=True)
+    )
+
+    if "price_change_percent" in df_predict_will_drop_filter.columns:
+        cols = df_predict_will_drop_filter.columns.tolist()
+        if "last_hours_until_departure" in cols:
+            cols.remove("last_hours_until_departure")
+            cols.insert(cols.index("price_change_percent"), "last_hours_until_departure")
+            df_predict_will_drop_filter = df_predict_will_drop_filter[cols]
+    
+    after_rows = len(df_predict_will_drop_filter)
+    print(
+        f"will_drop 连续段过滤完成(step={expected_step_minutes}min): {before_rows} -> {after_rows}"
+    )
+
+    # 当前时间,取整时
+    current_time = datetime.datetime.now()
+    current_time_str = current_time.strftime("%Y%m%d%H%M")
+    hourly_time = current_time.replace(minute=0, second=0, microsecond=0)
+    hourly_time_str = hourly_time.strftime("%Y%m%d%H%M")
+    
+    before_rows = len(df_predict_will_drop_filter)
+    df_predict_will_drop_filter["valid_end_dt"] = pd.to_datetime(
+        df_predict_will_drop_filter["valid_end_hour"],
+        errors="coerce",
+    )
+
+    df_predict_will_drop_filter_1 = df_predict_will_drop_filter[
+            (df_predict_will_drop_filter["valid_end_dt"] + pd.Timedelta(hours=8))
+        <= hourly_time
+    ].copy()
+    df_predict_will_drop_filter_1.drop(columns=["valid_end_dt"], inplace=True)
+    after_rows = len(df_predict_will_drop_filter_1)
+    print(
+        f"valid_end_hour(+8h)过滤完成: {before_rows} -> {after_rows} (hourly_time={hourly_time_str})"
+    )
+    
+    # 开始验证
+    df_predict_will_drop_validate = _validate_predict_df(df_predict_will_drop_filter_1)
+
+    timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+    save_scv = f"result_validate_{node}_zong_{timestamp_str}.csv"
+
+    output_path = os.path.join(output_dir, save_scv)
+    df_predict_will_drop_validate.to_csv(output_path, index=False, encoding="utf-8-sig")
+    print(f"保存完成: {output_path}")
+    print(f"验证完成: {node} zong")    
+    print()
+
+
+
 if __name__ == "__main__":
     parser = argparse.ArgumentParser(description='验证脚本')
     parser.add_argument('--interval', type=int, choices=[1], 
@@ -225,8 +413,10 @@ if __name__ == "__main__":
 
     # 0 手动验证
     if interval_hours == 0:
-        node, pred_time_str = "node0127", "202601292300"
-        validate_process(node, interval_hours, pred_time_str)
+        # node, pred_time_str = "node0127", "202601301500"
+        # validate_process(node, interval_hours, pred_time_str)
+        node = "node0127"
+        validate_process_zong(node)
     # 1 自动验证
     else:
         node = "node0127"