import os import datetime import pandas as pd from data_loader import mongo_con_parse, validate_keep_one_line from config import mongo_config, mongo_table_uo def _validate_keep_info_df(df_keep_info_part): client, db = mongo_con_parse(mongo_config) count = 0 if "price_diff" not in df_keep_info_part.columns: df_keep_info_part["price_diff"] = 0 if "time_diff_hours" not in df_keep_info_part.columns: df_keep_info_part["time_diff_hours"] = 0 for idx, row in df_keep_info_part.iterrows(): df_keep_info_part.at[idx, "price_diff"] = 0 df_keep_info_part.at[idx, "time_diff_hours"] = 0 city_pair = row['citypair'] flight_numbers = row['flight_numbers'] baggage_weight = row['baggage_weight'] from_date = row['from_date'] into_update_hour = row['into_update_hour'] into_update_dt = pd.to_datetime(into_update_hour, format='%Y-%m-%d %H:%M:%S') del_batch_time_str = row['del_batch_time_str'] del_batch_dt = pd.to_datetime(del_batch_time_str, format='%Y%m%d%H%M') del_batch_std_str = del_batch_dt.strftime('%Y-%m-%d %H:%M:%S') entry_price = pd.to_numeric(row.get('price_total'), errors='coerce') df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, del_batch_std_str) if (not df_query.empty) and pd.notna(entry_price): if ("price_total" in df_query.columns) and ("create_time" in df_query.columns): df_query["price_total"] = pd.to_numeric(df_query["price_total"], errors="coerce") df_query["create_dt"] = pd.to_datetime(df_query["create_time"], errors="coerce") df_query = ( df_query.dropna(subset=["price_total", "create_dt"]) .sort_values("create_dt") .reset_index(drop=True) ) mask_drop = df_query["price_total"] < entry_price if mask_drop.any(): first_row = df_query.loc[mask_drop].iloc[0] price_diff = entry_price - first_row["price_total"] time_diff_hours = (first_row["create_dt"] - into_update_dt) / pd.Timedelta(hours=1) df_keep_info_part.at[idx, "price_diff"] = round(float(price_diff), 2) df_keep_info_part.at[idx, "time_diff_hours"] = round(float(time_diff_hours), 2) del df_query count += 1 if count % 5 == 0: print(f"cal count: {count}") print(f"计算结束") client.close() return df_keep_info_part def verify_process(min_batch_time_str, max_batch_time_str): object_dir = "./keep" output_dir = f"./validate/keep" os.makedirs(output_dir, exist_ok=True) timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") save_scv = f"result_keep_verify_{timestamp_str}.csv" output_path = os.path.join(output_dir, save_scv) # 检查目录是否存在 if not os.path.exists(object_dir): print(f"目录不存在: {object_dir}") return # 获取所有以 keep_info_ 开头的 CSV 文件 csv_files = [] for file in os.listdir(object_dir): if file.startswith("keep_info_") and file.endswith(".csv"): csv_files.append(file) if not csv_files: print(f"在 {object_dir} 中没有找到 keep_info_ 开头的 CSV 文件") return csv_files.sort() min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M") min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0) max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M") max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0) if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt: print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出") return # 从所有的 keep_info 文件中 for csv_file in csv_files: batch_time_str = ( csv_file.replace("keep_info_", "").replace(".csv", "") ) batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M") batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0) if min_batch_dt is not None and batch_hour_dt < min_batch_dt: continue if max_batch_dt is not None and batch_hour_dt > max_batch_dt: continue # 读取 CSV 文件 csv_path = os.path.join(object_dir, csv_file) try: df_keep_info = pd.read_csv(csv_path) except Exception as e: print(f"read {csv_path} error: {str(e)}") df_keep_info = pd.DataFrame() if df_keep_info.empty: print(f"keep_info数据为空: {csv_file}") continue df_keep_info_del = df_keep_info[df_keep_info['keep_flag'] == -1].reset_index(drop=True) df_keep_info_del['del_batch_time_str'] = batch_time_str df_keep_info_del = _validate_keep_info_df(df_keep_info_del) # 根据价格变化情况, 移出时间与验证终点时间的对比, 计算 status_flag 状态 price_diff_num = pd.to_numeric(df_keep_info_del.get("price_diff"), errors="coerce").fillna(0) del_batch_dt = pd.to_datetime( df_keep_info_del.get("del_batch_time_str"), format="%Y%m%d%H%M", errors="coerce" ) valid_end_dt = pd.to_datetime( df_keep_info_del.get("valid_end_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce" ) status_flag = pd.Series(0, index=df_keep_info_del.index, dtype="int64") # 其它场景 status_flag.loc[price_diff_num > 0] = 1 # 降价场景 mask_zero = price_diff_num == 0 mask_time_ok = mask_zero & del_batch_dt.notna() & valid_end_dt.notna() & (del_batch_dt >= valid_end_dt) status_flag.loc[mask_time_ok] = 2 # 超时场景 df_keep_info_del["status_flag"] = status_flag write_header = not os.path.exists(output_path) df_keep_info_del.to_csv(output_path, mode="a", header=write_header, index=False, encoding="utf-8-sig") del df_keep_info_del print(f"批次:{batch_time_str} 检验结束") print("检验结束") print() def verify_process_2(min_batch_time_str, max_batch_time_str): object_dir = "/home/node04/descending_cabin_files_uo" output_dir = f"./validate/keep" os.makedirs(output_dir, exist_ok=True) timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") save_scv = f"result_keep_verify_{timestamp_str}.csv" output_path = os.path.join(output_dir, save_scv) # 检查目录是否存在 if not os.path.exists(object_dir): print(f"目录不存在: {object_dir}") return # 获取所有以 keep_info_end_ 开头的 CSV 文件 csv_files = [] for file in os.listdir(object_dir): if file.startswith("keep_info_end_") and file.endswith(".csv"): csv_files.append(file) if not csv_files: print(f"在 {object_dir} 中没有找到 keep_info_end_ 开头的 CSV 文件") return csv_files.sort() min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M") min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0) max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M") max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0) if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt: print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出") return list_df = [] # 从所有的 keep_info_end_ 文件中 for csv_file in csv_files: batch_time_str = csv_file.replace("keep_info_end_", "").replace(".csv", "") batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M%S") batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0) if min_batch_dt is not None and batch_hour_dt < min_batch_dt: continue if max_batch_dt is not None and batch_hour_dt > max_batch_dt: continue # 读取 CSV 文件 csv_path = os.path.join(object_dir, csv_file) try: df_keep_info = pd.read_csv(csv_path) except Exception as e: print(f"read {csv_path} error: {str(e)}") continue if df_keep_info.empty: print(f"keep_info数据为空: {csv_file}") continue df_keep_info["batch_time_str"] = batch_hour_dt.strftime("%Y%m%d%H%M") # df_keep_info["src_file"] = csv_file list_df.append(df_keep_info) del df_keep_info if not list_df: print("时间范围内没有可用 keep_info_end_ 数据") return df_keep_all = pd.concat(list_df, ignore_index=True) del list_df sort_cols = ["citypair", "flight_numbers", "baggage_weight", "from_date", "into_update_hour"] df_keep_all = df_keep_all.sort_values(sort_cols, kind="mergesort").reset_index(drop=True) df_keep_all["gid"] = df_keep_all.groupby(sort_cols, sort=False).ngroup().astype("int64") + 1 client, db = mongo_con_parse(mongo_config) list_base_row = [] for gid, df_gid in df_keep_all.groupby("gid", sort=False): city_pair = df_gid["citypair"].iloc[0] flight_numbers = df_gid["flight_numbers"].iloc[0] baggage_weight = int(df_gid["baggage_weight"].iloc[0]) from_date = df_gid["from_date"].iloc[0] into_update_hour = df_gid["into_update_hour"].iloc[0] valid_end_hour = df_gid["valid_end_hour"].iloc[0] into_update_dt = pd.to_datetime( df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce" ).min() # 进入序列的小时数 batch_dt_series = pd.to_datetime( df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce" ) batch_dt = batch_dt_series.max() # 离开序列的小时数 entry_price = float("nan") if batch_dt_series.notna().any(): idx_latest = batch_dt_series.idxmax() entry_price = pd.to_numeric(df_gid.loc[idx_latest].get("price_total"), errors="coerce") # 离开序列时的价格 valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce") # 距离起飞72小时的节点 flag = 0 # 等待标记 if batch_dt >= valid_end_dt: flag = 2 # (距离起飞前72小时)超时标记 elif batch_dt < max_batch_dt: flag = 3 # 弹出标记 if pd.isna(into_update_dt) or pd.isna(batch_dt): print(f"gid={gid} 时间字段解析失败,跳过") continue create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S") # 出序列的那个时间段 create_time_end = (batch_dt + pd.Timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S") # 出序列的那个时间往后延申2小时 df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, create_time_end) df_g1 = df_gid.copy() df_g2 = df_query.copy() df_g1["_batch_dt"] = pd.to_datetime( df_g1.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce" ) last_price = float(df_g1.iloc[-1]["price_total"]) df_last_price = df_g1[df_g1["price_total"] == last_price] base_row = df_last_price.iloc[0] # base_pos = int(df_last_price.index[0]) base_dt = base_row["_batch_dt"] # 出现最后价格的第一个批次 base_price = float(base_row["price_total"]) # 最后价格 drop_create_time = pd.NA drop_price = pd.NA price_diff = 0.0 time_diff_hours = 0.0 if not df_g2.empty: df_g2["create_dt"] = pd.to_datetime(df_g2.get("create_time"), errors="coerce") mask_drop = df_g2["price_total"] < base_price # 发生降价的场景 if mask_drop.any(): drop_row = df_g2.loc[mask_drop].iloc[0] drop_create_time = drop_row.get("create_time") drop_price = float(drop_row["price_total"]) price_diff = round(base_price - drop_price, 2) time_diff_hours = round( float((drop_row["create_dt"] - base_dt) / pd.Timedelta(hours=1)), 2, ) flag = 1 # 发生降价标记 # 没有发生降价的场景 else: pass base_row_cp = base_row.copy() base_row_cp["end_batch_dt"] = batch_dt base_row_cp["drop_create_time"] = drop_create_time base_row_cp["drop_price"] = drop_price base_row_cp["price_diff"] = price_diff base_row_cp["time_diff_hours"] = time_diff_hours if pd.notna(base_row_cp.get("end_batch_dt")) and pd.notna(base_row_cp.get("_batch_dt")): base_row_cp["time_diff_hours_2"] = round( float((base_row_cp["end_batch_dt"] - base_row_cp["_batch_dt"]) / pd.Timedelta(hours=1)), 2, ) else: base_row_cp["time_diff_hours_2"] = pd.NA base_row_cp["flag"] = flag list_base_row.append(base_row_cp) del df_g1 del df_g2 del df_last_price del df_query client.close() df_base = pd.DataFrame(list_base_row) df_base.to_csv(output_path, header=True, index=False, encoding="utf-8-sig") print(f"输出: {output_path}") return if __name__ == "__main__": # verify_process("202604151100", "202604161400") verify_process_2("202605071300", "202605080900") pass