| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- import os
- import datetime
- import time
- import pandas as pd
- from config import mongodb_config
- def follow_up_handle():
- '''后续处理'''
- object_dir = "./predictions_0"
- output_dir = "./keep_0"
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 检查目录是否存在
- if not os.path.exists(object_dir):
- print(f"目录不存在: {object_dir}")
- return
-
- # 获取所有以 future_predictions_ 开头的 CSV 文件
- csv_files = []
- for file in os.listdir(object_dir):
- if file.startswith("future_predictions_") and file.endswith(".csv"):
- csv_files.append(file)
-
- if not csv_files:
- print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
- return
-
- csv_files.sort()
-
- # 调试分支
- # target_time = "202603131400"
- # matching_files = [f for f in csv_files if target_time in f]
- # if matching_files:
- # last_csv_file = matching_files[0]
- # print(f"指定时间的文件: {last_csv_file}")
- # else:
- # print(f"未找到时间 {target_time} 的预测文件")
- # return
- # 正式分支
- last_csv_file = csv_files[-1] # 只看最新预测的文件
- print(f"最新预测文件: {last_csv_file}")
- if last_csv_file.startswith("future_predictions_") and last_csv_file.endswith(".csv"):
- target_time = last_csv_file.replace("future_predictions_", "").replace(".csv", "")
- else:
- target_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
- # 读取最新预测文件
- last_csv_path = os.path.join(object_dir, last_csv_file)
- df_last_predict = pd.read_csv(last_csv_path)
- df_last_predict_will_drop = df_last_predict[df_last_predict["will_price_drop"] == 1].reset_index(drop=True)
- df_last_predict_not_drop = df_last_predict[df_last_predict["will_price_drop"] == 0].reset_index(drop=True)
- print(f"最新预测文件中,预测降价的航班有 {len(df_last_predict_will_drop)} 条,预测不降价的航班有 {len(df_last_predict_not_drop)} 条")
-
- # 建一张 维护表 keep_info.csv 附加一个维护表快照 keep_info_{target_time}.csv
- keep_info_path = os.path.join(output_dir, "keep_info.csv")
- keep_info_snapshot_path = os.path.join(output_dir, f"keep_info_{target_time}.csv")
- key_cols = ["city_pair", "flight_day", "flight_number_1", "flight_number_2"]
- df_last_predict_will_drop = df_last_predict_will_drop.drop_duplicates(
- subset=key_cols, keep="last"
- ).reset_index(drop=True)
- # df_last_predict_not_drop = df_last_predict_not_drop.drop_duplicates(
- # subset=key_cols, keep="last"
- # ).reset_index(drop=True)
- # 读取维护表
- if os.path.exists(keep_info_path):
- try:
- df_keep_info = pd.read_csv(keep_info_path)
- except Exception as e:
- print(f"读取维护表失败: {keep_info_path}, error: {str(e)}")
- df_keep_info = pd.DataFrame()
- else:
- df_keep_info = pd.DataFrame()
- def _parse_dt(yyyymmddhhmm):
- try:
- return datetime.datetime.strptime(str(yyyymmddhhmm), "%Y%m%d%H%M")
- except Exception:
- return None
-
- current_dt = _parse_dt(target_time)
- prev_dt = None
- hud_decrement = 1
- # if not df_keep_info.empty and "last_predict_time" in df_keep_info.columns:
- # prev_candidates = (
- # df_keep_info["last_predict_time"].dropna().astype(str).tolist()
- # )
- # if prev_candidates:
- # prev_dt = _parse_dt(max(prev_candidates))
- if prev_dt is None:
- snapshot_times = []
- for f in os.listdir(output_dir):
- if (
- f.startswith("keep_info_")
- and f.endswith(".csv")
- and f != f"keep_info_{target_time}.csv"
- ):
- ts = f.replace("keep_info_", "").replace(".csv", "")
- dt = _parse_dt(ts)
- if dt is not None:
- snapshot_times.append(dt)
- if snapshot_times:
- prev_dt = max(snapshot_times)
- if current_dt is not None and prev_dt is not None:
- delta_seconds = (current_dt - prev_dt).total_seconds()
- if delta_seconds >= 0:
- hud_decrement = max(0, int(delta_seconds // 3600))
- else:
- hud_decrement = 0
- # 初始化维护表
- if df_keep_info.empty:
- df_keep_info = df_last_predict_will_drop.copy()
- df_keep_info["into_update_hour"] = df_keep_info['update_hour']
- # df_keep_info["into_price"] = df_keep_info['adult_total_price']
- df_keep_info["keep_flag"] = 1
- # df_keep_info["last_predict_time"] = target_time
- # 将长时间没更新的航班标记为-1
- dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce")
- dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce")
- mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=12)
- if mask_abnormal_time.any():
- df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1
-
- df_keep_info.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
- print(f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info)})")
- # 移除 keep_flag 为 -1 的行
- # before_rm = len(df_keep_info)
- df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True)
- # rm_rows = before_rm - len(df_keep_info)
- df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
- print(f"维护表已初始化: {keep_info_path} (rows={len(df_keep_info)})")
-
- # 已存在维护表
- else:
- if "keep_flag" not in df_keep_info.columns:
- df_keep_info["keep_flag"] = 0
-
- df_keep_info["keep_flag"] = (
- pd.to_numeric(df_keep_info["keep_flag"], errors="coerce")
- .fillna(0)
- .astype(int)
- )
- missing_cols = [c for c in key_cols if c not in df_keep_info.columns]
- if missing_cols:
- print(f"维护表缺少字段: {missing_cols}, path={keep_info_path}")
- return
-
- for c in key_cols:
- df_last_predict_will_drop[c] = df_last_predict_will_drop[c].astype(str)
- # df_last_predict_not_drop[c] = df_last_predict_not_drop[c].astype(str)
- df_keep_info[c] = df_keep_info[c].astype(str)
- df_keep_info = df_keep_info.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True)
- # 提取两者的标志位
- df_last_keys = df_last_predict_will_drop[key_cols].drop_duplicates().reset_index(drop=True)
- df_keep_keys = df_keep_info[key_cols].drop_duplicates().reset_index(drop=True)
- df_last_with_merge = df_last_predict_will_drop.merge(
- df_keep_keys, on=key_cols, how="left", indicator=True
- )
- # 场景一: 如果某一行数据在 df_last_predict_will_drop 出现,没有在 df_keep_info 里
- df_to_add = (
- df_last_with_merge.loc[df_last_with_merge["_merge"] == "left_only"]
- .drop(columns=["_merge"])
- .copy()
- )
- # keep_flag 设为 1
- if not df_to_add.empty:
- df_to_add['into_update_hour'] = df_to_add['update_hour']
- # df_to_add['into_price'] = df_to_add['adult_total_price']
- df_to_add["keep_flag"] = 1
-
- df_keep_with_merge = df_keep_info.reset_index().merge(
- df_last_keys, on=key_cols, how="left", indicator=True
- )
- # 场景二: 如果某一行数据在 df_last_predict_will_drop 和 df_keep_info 里都出现
- matched_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "both", "index"].tolist()
- # 场景三: 如果某一行数据在 df_last_predict_will_drop 没有出现,却在 df_keep_info 里都出现
- keep_only_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "left_only", "index"].tolist()
- # 符合场景二的索引 (在 df_keep_with_merge 中)
- if matched_idx:
- df_matched_keys = df_keep_info.loc[matched_idx, key_cols]
- df_latest_matched = df_matched_keys.merge(
- df_last_predict_will_drop, on=key_cols, how="left"
- )
- # 将 df_keep_info 的 df_matched_keys 的内容更新为 df_last_predict_will_drop 里对应的内容
- update_cols = [c for c in df_last_predict_will_drop.columns if c not in key_cols]
- for c in update_cols:
- if c == "keep_flag":
- continue
- if c not in df_keep_info.columns:
- df_keep_info[c] = pd.NA
- df_keep_info.loc[matched_idx, c] = df_latest_matched[c].values
- # 重新标记 原来是1 -> 0 原来是0 -> 0 原来是2 -> 0, 原来是-1 -> 1
- old_flags = df_keep_info.loc[matched_idx, "keep_flag"]
- df_keep_info.loc[matched_idx, "keep_flag"] = old_flags.apply(
- lambda x: 0 if x in (0, 1, 2) else (1 if x == -1 else 1)
- )
-
- # 符合场景三的索引 (在 df_keep_with_merge 中)
- if keep_only_idx:
- mask_keep_only = df_keep_info.index.isin(keep_only_idx) # 布尔索引序列
- # 如果 df_keep_info 的 keep_flag 为-1,此时标记为-2
- # mask_to_remove = mask_keep_only & (df_keep_info["keep_flag"] == -1)
- # df_keep_info.loc[mask_to_remove, "keep_flag"] = -2
- # 如果 df_keep_info 的 keep_flag 大于等于0
- mask_need_observe = mask_keep_only & (df_keep_info["keep_flag"] >= 0) # 布尔索引序列
- if mask_need_observe.any():
- if "hours_until_departure" not in df_keep_info.columns:
- df_keep_info.loc[mask_need_observe, "keep_flag"] = -1
- else:
- hud = pd.to_numeric(
- df_keep_info.loc[mask_need_observe, "hours_until_departure"],
- errors="coerce",
- )
- # hours_until_departure自动减1
- # new_hud = hud - 1
- new_hud = hud - hud_decrement
- df_keep_info.loc[mask_need_observe, "hours_until_departure"] = new_hud
- df_keep_info.loc[mask_need_observe, "keep_flag"] = -1 # 删除标志
- # df_keep_only_keys = df_keep_info.loc[mask_keep_only, key_cols].copy()
- # df_keep_only_keys["_row_idx"] = df_keep_only_keys.index
- # # 检查 df_keep_only_keys 是否在 df_last_predict_not_drop 中
- # df_keep_only_keys = df_keep_only_keys.merge(
- # df_last_predict_not_drop[key_cols].drop_duplicates(),
- # on=key_cols,
- # how="left",
- # indicator=True,
- # )
- # idx_in_not_drop = df_keep_only_keys.loc[
- # df_keep_only_keys["_merge"] == "both", "_row_idx"
- # ].tolist()
- # mask_in_not_drop = df_keep_info.index.isin(idx_in_not_drop) # 在 df_last_predict_not_drop 中出现 只是will_price_drop为0 未达边界
- # mask_not_drop_observe = mask_need_observe & mask_in_not_drop # 判断为不降价的布尔索引数组
- # mask_boundary_observe = mask_need_observe & ~mask_in_not_drop # 判断为到达边界的布尔索引数组
- # df_keep_info.loc[mask_not_drop_observe, "keep_flag"] = -1 # 删除标志
- # if mask_boundary_observe.any():
- # new_hud_full = pd.to_numeric(
- # df_keep_info["hours_until_departure"], errors="coerce"
- # )
- # df_keep_info.loc[mask_boundary_observe, "keep_flag"] = -1 # 默认删除标志
- # df_keep_info.loc[
- # mask_boundary_observe & new_hud_full.gt(4), "keep_flag" # 如果达到边界且hours_until_departure大于4 则给保留标志
- # ] = 2
-
- pass
-
- # 对于这些边界保持状态(keep_flag为2) 检查其是否在最新一次验价后的文件里存在, 如果不存在 则标记为-1
- # df_temp_2 = df_keep_info.loc[df_keep_info["keep_flag"] == 2, key_cols].copy()
- # if not df_temp_2.empty:
- # end_dir = "/home/node04/descending_cabin_files"
- # end_candidates = []
- # if os.path.isdir(end_dir):
- # for f in os.listdir(end_dir):
- # if f.startswith("keep_info_end_") and f.endswith(".csv"):
- # ts = f.replace("keep_info_end_", "").replace(".csv", "")
- # if ts.isdigit():
- # end_candidates.append((ts, f)) #(时间戳,文件名)
- # if end_candidates:
- # end_candidates.sort(key=lambda x: x[0])
- # end_last_path = os.path.join(end_dir, end_candidates[-1][1]) # 最新一次验价后的文件
- # try:
- # df_end_last = pd.read_csv(end_last_path)
- # except Exception:
- # df_end_last = pd.DataFrame()
- # if not df_end_last.empty and all(c in df_end_last.columns for c in key_cols): # key_cols作为比对条件
- # df_temp_2["_row_idx"] = df_temp_2.index
- # df_end_keys = df_end_last[key_cols].drop_duplicates().copy()
- # for c in key_cols:
- # df_temp_2[c] = df_temp_2[c].astype(str)
- # df_end_keys[c] = df_end_keys[c].astype(str)
- # df_temp_2_with_merge = df_temp_2.merge(
- # df_end_keys, on=key_cols, how="left", indicator=True
- # )
- # # 对于只在 df_temp_2 出现,而不在 df_end_keys 出现的索引,在 df_keep_info 中标记为-1
- # idx_to_rm_2 = df_temp_2_with_merge.loc[
- # df_temp_2_with_merge["_merge"] == "left_only", "_row_idx"
- # ].tolist()
- # if idx_to_rm_2:
- # df_keep_info.loc[idx_to_rm_2, "keep_flag"] = -1
- # 将 df_to_add 添加到 df_keep_info 之后
- add_rows = len(df_to_add) if "df_to_add" in locals() else 0
- if add_rows:
- df_keep_info = pd.concat([df_keep_info, df_to_add], ignore_index=True)
-
- # 将长时间没更新的航班标记为-1
- dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce")
- dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce")
- mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=12)
- if mask_abnormal_time.any():
- df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1
-
- df_keep_info_snapshot = df_keep_info.copy()
- df_keep_info_snapshot.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig")
- print(
- f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info_snapshot)})"
- )
- # 移除 keep_flag 为 -1 的行
- before_rm = len(df_keep_info)
- df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True)
- rm_rows = before_rm - len(df_keep_info)
- # 保存更新后的 df_keep_info 到维护表csv文件
- df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig")
- print(
- f"维护表已更新: {keep_info_path} (rows={len(df_keep_info)} add={add_rows} rm={rm_rows})"
- )
- # ================================================================
- # for idx, row in df_last_predict_will_drop.iterrows():
- # city_pair = row['city_pair']
- # flight_day = row['flight_day']
- # flight_number_1 = row['flight_number_1']
- # flight_number_2 = row['flight_number_2']
- # baggage = row['baggage']
- # from_city_code = city_pair.split('-')[0]
- # to_city_code = city_pair.split('-')[1]
- # from_day = datetime.datetime.strptime(flight_day, '%Y-%m-%d').strftime('%Y%m%d')
- # baggage_str = f"1-{baggage}"
- # pass
- # adult_total_price = row['adult_total_price']
- # hours_until_departure = row['hours_until_departure']
-
- pass
- if __name__ == "__main__":
- time.sleep(2)
- follow_up_handle()
- time.sleep(5)
- from descending_cabin_task import main as descending_cabin_task_main
- descending_cabin_task_main()
|