Kaynağa Gözat

提交一部分后续处理

node04 1 ay önce
ebeveyn
işleme
8444adcf06
3 değiştirilmiş dosya ile 303 ekleme ve 0 silme
  1. 1 0
      .gitignore
  2. 256 0
      follow_up.py
  3. 46 0
      run_uo.sh

+ 1 - 0
.gitignore

@@ -34,6 +34,7 @@ desktop.ini
 photo/
 data_shards/
 predictions/
+keep/
 
 # 字体文件(体积大,不适合版本控制)
 *.ttf

+ 256 - 0
follow_up.py

@@ -0,0 +1,256 @@
+import os
+import datetime
+import time
+import pandas as pd
+
+
+def follow_up_handle():
+    '''后续处理'''
+    object_dir = "./predictions"
+    output_dir = "./keep"
+
+    # 创建输出目录
+    os.makedirs(output_dir, exist_ok=True)   
+
+    # 检查目录是否存在
+    if not os.path.exists(object_dir):
+        print(f"目录不存在: {object_dir}")
+        return
+
+    # 获取所有以 future_predictions_ 开头的 CSV 文件
+    csv_files = []
+    for file in os.listdir(object_dir):
+        if file.startswith("future_predictions_") and file.endswith(".csv"):
+            csv_files.append(file)
+    
+    if not csv_files:
+        print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
+        return
+    
+    csv_files.sort()
+
+    # 调试分支
+    # target_time = "202603271700"
+    # matching_files = [f for f in csv_files if target_time in f]
+    # if matching_files:
+    #     last_csv_file = matching_files[0]
+    #     print(f"指定时间的文件: {last_csv_file}")
+    # else:
+    #     print(f"未找到时间 {target_time} 的预测文件")
+    #     return
+
+    # 正式分支
+    last_csv_file = csv_files[-1]   # 只看最新预测的文件
+    print(f"最新预测文件: {last_csv_file}")
+    if last_csv_file.startswith("future_predictions_") and last_csv_file.endswith(".csv"):
+        target_time = last_csv_file.replace("future_predictions_", "").replace(".csv", "")
+    else:
+        target_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
+    
+    # 读取最新预测文件
+    last_csv_path = os.path.join(object_dir, last_csv_file)
+    df_last_predict = pd.read_csv(last_csv_path)
+
+    df_last_predict_will_drop = df_last_predict[df_last_predict["will_price_drop"] == 1].reset_index(drop=True)
+    df_last_predict_not_drop = df_last_predict[df_last_predict["will_price_drop"] == 0].reset_index(drop=True)
+    print(f"最新预测文件中,预测降价的航班有 {len(df_last_predict_will_drop)} 条,预测不降价的航班有 {len(df_last_predict_not_drop)} 条")
+
+    # 建一张 维护表 keep_info.csv  附加一个维护表快照 keep_info_{target_time}.csv
+    keep_info_path = os.path.join(output_dir, "keep_info.csv")
+    keep_info_snapshot_path = os.path.join(output_dir, f"keep_info_{target_time}.csv")
+    key_cols = ["citypair", "flight_numbers", "baggage_weight", "from_date"]
+
+    # 去重操作
+    df_last_predict_will_drop = df_last_predict_will_drop.drop_duplicates(
+        subset=key_cols, keep="last"
+    ).reset_index(drop=True)
+    # df_last_predict_not_drop = df_last_predict_not_drop.drop_duplicates(
+    #     subset=key_cols, keep="last"
+    # ).reset_index(drop=True)
+
+    # 读取维护表
+    if os.path.exists(keep_info_path):
+        try:
+            df_keep_info = pd.read_csv(keep_info_path)
+        except Exception as e:
+            print(f"读取维护表失败: {keep_info_path}, error: {str(e)}")
+            df_keep_info = pd.DataFrame()
+    else:
+        df_keep_info = pd.DataFrame()
+    
+    def _parse_dt(yyyymmddhhmm):
+        try:
+            return datetime.datetime.strptime(str(yyyymmddhhmm), "%Y%m%d%H%M")
+        except Exception:
+            return None
+    
+    current_dt = _parse_dt(target_time)
+    prev_dt = None
+    hud_decrement = 1
+
+    # if not df_keep_info.empty and "last_predict_time" in df_keep_info.columns:
+    #     prev_candidates = (
+    #         df_keep_info["last_predict_time"].dropna().astype(str).tolist()
+    #     )
+    #     if prev_candidates:
+    #         prev_dt = _parse_dt(max(prev_candidates))
+
+    if prev_dt is None:
+        snapshot_times = []
+        for f in os.listdir(output_dir):
+            if (
+                f.startswith("keep_info_")
+                and f.endswith(".csv")
+                and f != f"keep_info_{target_time}.csv"
+            ):
+                ts = f.replace("keep_info_", "").replace(".csv", "")
+                dt = _parse_dt(ts)
+                if dt is not None:
+                    snapshot_times.append(dt)
+        if snapshot_times:
+            prev_dt = max(snapshot_times)
+    
+    if current_dt is not None and prev_dt is not None:
+        delta_seconds = (current_dt - prev_dt).total_seconds()
+        if delta_seconds >= 0:
+            hud_decrement = max(0, int(delta_seconds // 3600))
+        else:
+            hud_decrement = 0
+    
+    # 初始化维护表
+    if df_keep_info.empty:
+        df_keep_info = df_last_predict_will_drop.copy()
+        df_keep_info["into_update_hour"] = df_keep_info['update_hour']
+        # df_keep_info["into_price"] = df_keep_info['price_total']
+        df_keep_info["keep_flag"] = 1
+        # df_keep_info["last_predict_time"] = target_time
+
+        df_keep_info.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
+        print(f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info)})")
+
+        df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
+        print(f"维护表已初始化: {keep_info_path} (rows={len(df_keep_info)})")
+    
+    # 已存在维护表
+    else:
+        if "keep_flag" not in df_keep_info.columns:
+            df_keep_info["keep_flag"] = 0
+
+        df_keep_info["keep_flag"] = (
+            pd.to_numeric(df_keep_info["keep_flag"], errors="coerce")
+            .fillna(0)
+            .astype(int)
+        )
+
+        missing_cols = [c for c in key_cols if c not in df_keep_info.columns]
+        if missing_cols:
+            print(f"维护表缺少字段: {missing_cols}, path={keep_info_path}")
+            return
+        
+        for c in key_cols:
+            df_last_predict_will_drop[c] = df_last_predict_will_drop[c].astype(str)
+            # df_last_predict_not_drop[c] = df_last_predict_not_drop[c].astype(str)
+            df_keep_info[c] = df_keep_info[c].astype(str)
+        
+        df_keep_info = df_keep_info.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True)
+
+        # 提取两者的标志位
+        df_last_keys = df_last_predict_will_drop[key_cols].drop_duplicates().reset_index(drop=True)
+        df_keep_keys = df_keep_info[key_cols].drop_duplicates().reset_index(drop=True)
+
+        df_last_with_merge = df_last_predict_will_drop.merge(
+            df_keep_keys, on=key_cols, how="left", indicator=True
+        )
+
+        # 场景一: 如果某一行数据在 df_last_predict_will_drop 出现,没有在 df_keep_info 里
+        df_to_add = (
+            df_last_with_merge.loc[df_last_with_merge["_merge"] == "left_only"]
+            .drop(columns=["_merge"])
+            .copy()
+        )
+
+        # keep_flag 设为 1
+        if not df_to_add.empty:
+            df_to_add['into_update_hour'] = df_to_add['update_hour']
+            # df_to_add['into_price'] = df_to_add['price_total']
+            df_to_add["keep_flag"] = 1
+        
+        df_keep_with_merge = df_keep_info.reset_index().merge(
+            df_last_keys, on=key_cols, how="left", indicator=True
+        )
+
+        # 场景二: 如果某一行数据在 df_last_predict_will_drop 和 df_keep_info 里都出现
+        matched_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "both", "index"].tolist()
+        # 场景三: 如果某一行数据在 df_last_predict_will_drop 没有出现,却在 df_keep_info 里都出现
+        keep_only_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "left_only", "index"].tolist()
+
+        # 符合场景二的索引 (在 df_keep_with_merge 中)
+        if matched_idx:
+            df_matched_keys = df_keep_info.loc[matched_idx, key_cols]
+            df_latest_matched = df_matched_keys.merge(
+                df_last_predict_will_drop, on=key_cols, how="left"
+            )
+            # 将 df_keep_info 的 df_matched_keys 的内容更新为 df_last_predict_will_drop 里对应的内容
+            update_cols = [c for c in df_last_predict_will_drop.columns if c not in key_cols]
+            for c in update_cols:
+                if c == "keep_flag":
+                    continue
+                if c not in df_keep_info.columns:
+                    df_keep_info[c] = pd.NA
+                df_keep_info.loc[matched_idx, c] = df_latest_matched[c].values
+            
+            # 重新标记 原来是1 -> 0  原来是0 -> 0  原来是2 -> 0, 原来是-1 -> 1
+            old_flags = df_keep_info.loc[matched_idx, "keep_flag"]
+            df_keep_info.loc[matched_idx, "keep_flag"] = old_flags.apply(
+                lambda x: 0 if x in (0, 1, 2) else (1 if x == -1 else 1)
+            )
+            
+        # 符合场景三的索引 (在 df_keep_with_merge 中)
+        if keep_only_idx:
+            mask_keep_only = df_keep_info.index.isin(keep_only_idx)  # 布尔索引序列
+            
+            # 如果 df_keep_info 的 keep_flag 大于等于0
+            mask_need_observe = mask_keep_only & (df_keep_info["keep_flag"] >= 0)  # 布尔索引序列
+            if mask_need_observe.any():
+                if "hours_until_departure" not in df_keep_info.columns:
+                    df_keep_info.loc[mask_need_observe, "keep_flag"] = -1
+                else:
+                    hud = pd.to_numeric(
+                        df_keep_info.loc[mask_need_observe, "hours_until_departure"],
+                        errors="coerce",
+                    )
+                    # hours_until_departure自动减1
+                    # new_hud = hud - 1
+                    new_hud = hud - hud_decrement
+                    df_keep_info.loc[mask_need_observe, "hours_until_departure"] = new_hud
+                    df_keep_info.loc[mask_need_observe, "keep_flag"] = -1       # 删除标志
+
+        # 将 df_to_add 添加到 df_keep_info 之后
+        add_rows = len(df_to_add) if "df_to_add" in locals() else 0
+        if add_rows:
+            df_keep_info = pd.concat([df_keep_info, df_to_add], ignore_index=True)
+        
+        df_keep_info_snapshot = df_keep_info.copy()
+        df_keep_info_snapshot.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
+        print(
+            f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info_snapshot)})"
+        )
+
+        # 移除 keep_flag 为 -1 的行
+        before_rm = len(df_keep_info)
+        df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True)
+        rm_rows = before_rm - len(df_keep_info)
+
+        # 保存更新后的 df_keep_info 到维护表csv文件
+        df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
+        print(
+            f"维护表已更新: {keep_info_path} (rows={len(df_keep_info)} add={add_rows} rm={rm_rows})"
+        )
+
+    pass
+
+if __name__ == "__main__":
+    time.sleep(5)
+    follow_up_handle()
+    time.sleep(5)
+    

+ 46 - 0
run_uo.sh

@@ -0,0 +1,46 @@
+#!/bin/bash
+
+cd /home/node04/yuzhou/jiangcang_uo
+
+LOG_DIR=logs
+mkdir -p $LOG_DIR
+
+log() {
+    echo "$(date '+%Y-%m-%d %H:%M:%S') $1" >> $LOG_DIR/control.log
+}
+
+log "=== 脚本开始执行 ==="
+
+START_TIME=$(date +%s)
+
+# 启动第一个任务(后台执行)
+/home/node04/anaconda3/bin/python main_pe.py >> $LOG_DIR/prediction.log 2>&1 &
+PID=$!
+
+log "main_pe.py 已启动,PID=$PID"
+
+# 最大等待时间(15分钟 = 900秒)
+TIMEOUT=900
+
+while kill -0 $PID 2>/dev/null; do
+    NOW=$(date +%s)
+    ELAPSED=$((NOW - START_TIME))
+
+    if [ $ELAPSED -ge $TIMEOUT ]; then
+        log "main_pe.py 超时(已运行 ${ELAPSED} 秒),开始执行 follow_up.py"
+        break
+    fi
+
+    sleep 5
+done
+
+# 如果是正常结束
+if ! kill -0 $PID 2>/dev/null; then
+    log "main_pe.py 正常结束(耗时 ${ELAPSED} 秒)"
+fi
+
+log "开始执行 follow_up.py"
+
+/home/node04/anaconda3/bin/python follow_up.py >> $LOG_DIR/keep.log 2>&1
+
+log "=== 脚本执行结束 ==="