import os import datetime import time import pandas as pd from config import mongodb_config def follow_up_handle(): '''后续处理''' object_dir = "./predictions_0" output_dir = "./keep_0" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 检查目录是否存在 if not os.path.exists(object_dir): print(f"目录不存在: {object_dir}") return # 获取所有以 future_predictions_ 开头的 CSV 文件 csv_files = [] for file in os.listdir(object_dir): if file.startswith("future_predictions_") and file.endswith(".csv"): csv_files.append(file) if not csv_files: print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件") return csv_files.sort() # 调试分支 # target_time = "202603131400" # matching_files = [f for f in csv_files if target_time in f] # if matching_files: # last_csv_file = matching_files[0] # print(f"指定时间的文件: {last_csv_file}") # else: # print(f"未找到时间 {target_time} 的预测文件") # return # 正式分支 last_csv_file = csv_files[-1] # 只看最新预测的文件 print(f"最新预测文件: {last_csv_file}") if last_csv_file.startswith("future_predictions_") and last_csv_file.endswith(".csv"): target_time = last_csv_file.replace("future_predictions_", "").replace(".csv", "") else: target_time = datetime.datetime.now().strftime("%Y%m%d%H%M") # 读取最新预测文件 last_csv_path = os.path.join(object_dir, last_csv_file) df_last_predict = pd.read_csv(last_csv_path) df_last_predict_will_drop = df_last_predict[df_last_predict["will_price_drop"] == 1].reset_index(drop=True) df_last_predict_not_drop = df_last_predict[df_last_predict["will_price_drop"] == 0].reset_index(drop=True) print(f"最新预测文件中,预测降价的航班有 {len(df_last_predict_will_drop)} 条,预测不降价的航班有 {len(df_last_predict_not_drop)} 条") # 建一张 维护表 keep_info.csv 附加一个维护表快照 keep_info_{target_time}.csv keep_info_path = os.path.join(output_dir, "keep_info.csv") keep_info_snapshot_path = os.path.join(output_dir, f"keep_info_{target_time}.csv") key_cols = ["city_pair", "flight_day", "flight_number_1", "flight_number_2"] df_last_predict_will_drop = df_last_predict_will_drop.drop_duplicates( subset=key_cols, keep="last" ).reset_index(drop=True) # df_last_predict_not_drop = df_last_predict_not_drop.drop_duplicates( # subset=key_cols, keep="last" # ).reset_index(drop=True) # 读取维护表 if os.path.exists(keep_info_path): try: df_keep_info = pd.read_csv(keep_info_path) except Exception as e: print(f"读取维护表失败: {keep_info_path}, error: {str(e)}") df_keep_info = pd.DataFrame() else: df_keep_info = pd.DataFrame() def _parse_dt(yyyymmddhhmm): try: return datetime.datetime.strptime(str(yyyymmddhhmm), "%Y%m%d%H%M") except Exception: return None current_dt = _parse_dt(target_time) prev_dt = None hud_decrement = 1 # if not df_keep_info.empty and "last_predict_time" in df_keep_info.columns: # prev_candidates = ( # df_keep_info["last_predict_time"].dropna().astype(str).tolist() # ) # if prev_candidates: # prev_dt = _parse_dt(max(prev_candidates)) if prev_dt is None: snapshot_times = [] for f in os.listdir(output_dir): if ( f.startswith("keep_info_") and f.endswith(".csv") and f != f"keep_info_{target_time}.csv" ): ts = f.replace("keep_info_", "").replace(".csv", "") dt = _parse_dt(ts) if dt is not None: snapshot_times.append(dt) if snapshot_times: prev_dt = max(snapshot_times) if current_dt is not None and prev_dt is not None: delta_seconds = (current_dt - prev_dt).total_seconds() if delta_seconds >= 0: hud_decrement = max(0, int(delta_seconds // 3600)) else: hud_decrement = 0 # 初始化维护表 if df_keep_info.empty: df_keep_info = df_last_predict_will_drop.copy() df_keep_info["into_update_hour"] = df_keep_info['update_hour'] # df_keep_info["into_price"] = df_keep_info['adult_total_price'] df_keep_info["keep_flag"] = 1 # df_keep_info["last_predict_time"] = target_time # 将长时间没更新的航班标记为-1 dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce") dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce") mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=8) if mask_abnormal_time.any(): df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1 df_keep_info.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig") print(f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info)})") # 移除 keep_flag 为 -1 的行 # before_rm = len(df_keep_info) df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True) # rm_rows = before_rm - len(df_keep_info) df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig") print(f"维护表已初始化: {keep_info_path} (rows={len(df_keep_info)})") # 已存在维护表 else: if "keep_flag" not in df_keep_info.columns: df_keep_info["keep_flag"] = 0 df_keep_info["keep_flag"] = ( pd.to_numeric(df_keep_info["keep_flag"], errors="coerce") .fillna(0) .astype(int) ) missing_cols = [c for c in key_cols if c not in df_keep_info.columns] if missing_cols: print(f"维护表缺少字段: {missing_cols}, path={keep_info_path}") return for c in key_cols: df_last_predict_will_drop[c] = df_last_predict_will_drop[c].astype(str) # df_last_predict_not_drop[c] = df_last_predict_not_drop[c].astype(str) df_keep_info[c] = df_keep_info[c].astype(str) df_keep_info = df_keep_info.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True) # 提取两者的标志位 df_last_keys = df_last_predict_will_drop[key_cols].drop_duplicates().reset_index(drop=True) df_keep_keys = df_keep_info[key_cols].drop_duplicates().reset_index(drop=True) df_last_with_merge = df_last_predict_will_drop.merge( df_keep_keys, on=key_cols, how="left", indicator=True ) # 场景一: 如果某一行数据在 df_last_predict_will_drop 出现,没有在 df_keep_info 里 df_to_add = ( df_last_with_merge.loc[df_last_with_merge["_merge"] == "left_only"] .drop(columns=["_merge"]) .copy() ) # keep_flag 设为 1 if not df_to_add.empty: df_to_add['into_update_hour'] = df_to_add['update_hour'] # df_to_add['into_price'] = df_to_add['adult_total_price'] df_to_add["keep_flag"] = 1 df_keep_with_merge = df_keep_info.reset_index().merge( df_last_keys, on=key_cols, how="left", indicator=True ) # 场景二: 如果某一行数据在 df_last_predict_will_drop 和 df_keep_info 里都出现 matched_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "both", "index"].tolist() # 场景三: 如果某一行数据在 df_last_predict_will_drop 没有出现,却在 df_keep_info 里都出现 keep_only_idx = df_keep_with_merge.loc[df_keep_with_merge["_merge"] == "left_only", "index"].tolist() # 符合场景二的索引 (在 df_keep_with_merge 中) if matched_idx: df_matched_keys = df_keep_info.loc[matched_idx, key_cols] df_latest_matched = df_matched_keys.merge( df_last_predict_will_drop, on=key_cols, how="left" ) # 将 df_keep_info 的 df_matched_keys 的内容更新为 df_last_predict_will_drop 里对应的内容 update_cols = [c for c in df_last_predict_will_drop.columns if c not in key_cols] for c in update_cols: if c == "keep_flag": continue if c not in df_keep_info.columns: df_keep_info[c] = pd.NA df_keep_info.loc[matched_idx, c] = df_latest_matched[c].values # 重新标记 原来是1 -> 0 原来是0 -> 0 原来是2 -> 0, 原来是-1 -> 1 old_flags = df_keep_info.loc[matched_idx, "keep_flag"] df_keep_info.loc[matched_idx, "keep_flag"] = old_flags.apply( lambda x: 0 if x in (0, 1, 2) else (1 if x == -1 else 1) ) # 符合场景三的索引 (在 df_keep_with_merge 中) if keep_only_idx: mask_keep_only = df_keep_info.index.isin(keep_only_idx) # 布尔索引序列 # 如果 df_keep_info 的 keep_flag 为-1,此时标记为-2 # mask_to_remove = mask_keep_only & (df_keep_info["keep_flag"] == -1) # df_keep_info.loc[mask_to_remove, "keep_flag"] = -2 # 如果 df_keep_info 的 keep_flag 大于等于0 mask_need_observe = mask_keep_only & (df_keep_info["keep_flag"] >= 0) # 布尔索引序列 if mask_need_observe.any(): if "hours_until_departure" not in df_keep_info.columns: df_keep_info.loc[mask_need_observe, "keep_flag"] = -1 else: hud = pd.to_numeric( df_keep_info.loc[mask_need_observe, "hours_until_departure"], errors="coerce", ) # hours_until_departure自动减1 # new_hud = hud - 1 new_hud = hud - hud_decrement df_keep_info.loc[mask_need_observe, "hours_until_departure"] = new_hud df_keep_info.loc[mask_need_observe, "keep_flag"] = -1 # 删除标志 # df_keep_only_keys = df_keep_info.loc[mask_keep_only, key_cols].copy() # df_keep_only_keys["_row_idx"] = df_keep_only_keys.index # # 检查 df_keep_only_keys 是否在 df_last_predict_not_drop 中 # df_keep_only_keys = df_keep_only_keys.merge( # df_last_predict_not_drop[key_cols].drop_duplicates(), # on=key_cols, # how="left", # indicator=True, # ) # idx_in_not_drop = df_keep_only_keys.loc[ # df_keep_only_keys["_merge"] == "both", "_row_idx" # ].tolist() # mask_in_not_drop = df_keep_info.index.isin(idx_in_not_drop) # 在 df_last_predict_not_drop 中出现 只是will_price_drop为0 未达边界 # mask_not_drop_observe = mask_need_observe & mask_in_not_drop # 判断为不降价的布尔索引数组 # mask_boundary_observe = mask_need_observe & ~mask_in_not_drop # 判断为到达边界的布尔索引数组 # df_keep_info.loc[mask_not_drop_observe, "keep_flag"] = -1 # 删除标志 # if mask_boundary_observe.any(): # new_hud_full = pd.to_numeric( # df_keep_info["hours_until_departure"], errors="coerce" # ) # df_keep_info.loc[mask_boundary_observe, "keep_flag"] = -1 # 默认删除标志 # df_keep_info.loc[ # mask_boundary_observe & new_hud_full.gt(4), "keep_flag" # 如果达到边界且hours_until_departure大于4 则给保留标志 # ] = 2 pass # 对于这些边界保持状态(keep_flag为2) 检查其是否在最新一次验价后的文件里存在, 如果不存在 则标记为-1 # df_temp_2 = df_keep_info.loc[df_keep_info["keep_flag"] == 2, key_cols].copy() # if not df_temp_2.empty: # end_dir = "/home/node04/descending_cabin_files" # end_candidates = [] # if os.path.isdir(end_dir): # for f in os.listdir(end_dir): # if f.startswith("keep_info_end_") and f.endswith(".csv"): # ts = f.replace("keep_info_end_", "").replace(".csv", "") # if ts.isdigit(): # end_candidates.append((ts, f)) #(时间戳,文件名) # if end_candidates: # end_candidates.sort(key=lambda x: x[0]) # end_last_path = os.path.join(end_dir, end_candidates[-1][1]) # 最新一次验价后的文件 # try: # df_end_last = pd.read_csv(end_last_path) # except Exception: # df_end_last = pd.DataFrame() # if not df_end_last.empty and all(c in df_end_last.columns for c in key_cols): # key_cols作为比对条件 # df_temp_2["_row_idx"] = df_temp_2.index # df_end_keys = df_end_last[key_cols].drop_duplicates().copy() # for c in key_cols: # df_temp_2[c] = df_temp_2[c].astype(str) # df_end_keys[c] = df_end_keys[c].astype(str) # df_temp_2_with_merge = df_temp_2.merge( # df_end_keys, on=key_cols, how="left", indicator=True # ) # # 对于只在 df_temp_2 出现,而不在 df_end_keys 出现的索引,在 df_keep_info 中标记为-1 # idx_to_rm_2 = df_temp_2_with_merge.loc[ # df_temp_2_with_merge["_merge"] == "left_only", "_row_idx" # ].tolist() # if idx_to_rm_2: # df_keep_info.loc[idx_to_rm_2, "keep_flag"] = -1 # 将 df_to_add 添加到 df_keep_info 之后 add_rows = len(df_to_add) if "df_to_add" in locals() else 0 if add_rows: df_keep_info = pd.concat([df_keep_info, df_to_add], ignore_index=True) # 将长时间没更新的航班标记为-1 dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce") dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce") mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=8) if mask_abnormal_time.any(): df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1 df_keep_info_snapshot = df_keep_info.copy() df_keep_info_snapshot.to_csv(keep_info_snapshot_path, index=False, encoding="utf-8-sig") print( f"维护表快照已保存: {keep_info_snapshot_path} (rows={len(df_keep_info_snapshot)})" ) # 移除 keep_flag 为 -1 的行 before_rm = len(df_keep_info) df_keep_info = df_keep_info.loc[df_keep_info["keep_flag"] != -1].reset_index(drop=True) rm_rows = before_rm - len(df_keep_info) # 保存更新后的 df_keep_info 到维护表csv文件 df_keep_info.to_csv(keep_info_path, index=False, encoding="utf-8-sig") print( f"维护表已更新: {keep_info_path} (rows={len(df_keep_info)} add={add_rows} rm={rm_rows})" ) # ================================================================ # for idx, row in df_last_predict_will_drop.iterrows(): # city_pair = row['city_pair'] # flight_day = row['flight_day'] # flight_number_1 = row['flight_number_1'] # flight_number_2 = row['flight_number_2'] # baggage = row['baggage'] # from_city_code = city_pair.split('-')[0] # to_city_code = city_pair.split('-')[1] # from_day = datetime.datetime.strptime(flight_day, '%Y-%m-%d').strftime('%Y%m%d') # baggage_str = f"1-{baggage}" # pass # adult_total_price = row['adult_total_price'] # hours_until_departure = row['hours_until_departure'] pass if __name__ == "__main__": time.sleep(5) follow_up_handle() time.sleep(5) from descending_cabin_task import main as descending_cabin_task_main descending_cabin_task_main()