import argparse import datetime import os import pandas as pd from data_loader import mongo_con_parse, validate_one_line, fill_hourly_crawl_date def _validate_predict_df(df_predict): client, db = mongo_con_parse() count = 0 for idx, row in df_predict.iterrows(): city_pair = row['city_pair'] flight_day = row['flight_day'] flight_number_1 = row['flight_number_1'] flight_number_2 = row['flight_number_2'] baggage = row['baggage'] valid_begin_hour = row['valid_begin_hour'] valid_begin_dt = pd.to_datetime(valid_begin_hour, format='%Y-%m-%d %H:%M:%S') # valid_end_hour = row['valid_end_hour'] # valid_end_dt = pd.to_datetime(valid_end_hour, format='%Y-%m-%d %H:%M:%S') update_hour = row['update_hour'] update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S') valid_begin_hour_modify = max( valid_begin_dt, update_dt ).strftime('%Y-%m-%d %H:%M:%S') df_val= validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify) # 有可能在当前验证时刻,数据库里没有在valid_begin_hour之后的数据 if not df_val.empty: df_val_f = fill_hourly_crawl_date(df_val, rear_fill=2) df_val_f = df_val_f[df_val_f['is_filled']==0] # 只要原始数据,不要补齐的 # df_val_f = df_val_f[df_val_f['update_hour'] <= valid_end_dt] if df_val_f.empty: drop_flag = 0 # first_drop_amount = pd.NA first_drop_price = pd.NA first_drop_hours_until_departure = pd.NA first_drop_update_hour = pd.NA last_hours_util = pd.NA last_update_hour = pd.NA list_change_price = [] list_change_hours = [] else: # 有效数据的最后一行 last_row = df_val_f.iloc[-1] last_hours_util = last_row['hours_until_departure'] last_update_hour = last_row['update_hour'] # 价格变化过滤 df_price_changes = df_val_f.loc[ df_val_f["adult_total_price"].shift() != df_val_f["adult_total_price"] ].copy() # 价格变化幅度 df_price_changes['change_amount'] = df_price_changes['adult_total_price'].diff().fillna(0) # 找到第一个 change_amount 小于 -5 的行 first_negative_change = df_price_changes[df_price_changes['change_amount'] < -5].head(1) # 提取所需的值 if not first_negative_change.empty: drop_flag = 1 # first_drop_amount = first_negative_change['change_amount'].iloc[0].round(2) first_drop_price = first_negative_change['adult_total_price'].iloc[0].round(2) first_drop_hours_until_departure = first_negative_change['hours_until_departure'].iloc[0] first_drop_update_hour = first_negative_change['update_hour'].iloc[0] else: drop_flag = 0 # first_drop_amount = pd.NA first_drop_price = pd.NA first_drop_hours_until_departure = pd.NA first_drop_update_hour = pd.NA list_change_price = df_price_changes['adult_total_price'].tolist() list_change_hours = df_price_changes['hours_until_departure'].tolist() else: drop_flag = 0 # first_drop_amount = pd.NA first_drop_price = pd.NA first_drop_hours_until_departure = pd.NA first_drop_update_hour = pd.NA last_hours_util = pd.NA last_update_hour = pd.NA list_change_price = [] list_change_hours = [] safe_sep = "; " df_predict.at[idx, 'change_prices'] = safe_sep.join(map(str, list_change_price)) df_predict.at[idx, 'change_hours'] = safe_sep.join(map(str, list_change_hours)) df_predict.at[idx, 'last_hours_util'] = last_hours_util df_predict.at[idx, 'last_update_hour'] = last_update_hour # df_predict.at[idx, 'first_drop_amount'] = first_drop_amount * -1 # 负数转正数 df_predict.at[idx, 'first_drop_price'] = first_drop_price df_predict.at[idx, 'first_drop_hours_until_departure'] = first_drop_hours_until_departure df_predict.at[idx, 'first_drop_update_hour'] = first_drop_update_hour df_predict.at[idx, 'drop_flag'] = drop_flag count += 1 if count % 5 == 0: print(f"cal count: {count}") print(f"计算结束") client.close() return df_predict def validate_process(node, interval_hours, pred_time_str): '''手动验证脚本''' date = pred_time_str[4:8] output_dir = f"./validate/{node}_{date}" os.makedirs(output_dir, exist_ok=True) object_dir = "./predictions_0" csv_file = f'future_predictions_{pred_time_str}.csv' csv_path = os.path.join(object_dir, csv_file) try: df_predict = pd.read_csv(csv_path) except Exception as e: print(f"read {csv_path} error: {str(e)}") df_predict = pd.DataFrame() if df_predict.empty: print(f"预测数据为空") return df_predict = _validate_predict_df(df_predict) timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") save_scv = f"result_validate_{node}_{pred_time_str}_{timestamp_str}.csv" output_path = os.path.join(output_dir, save_scv) df_predict.to_csv(output_path, index=False, encoding="utf-8-sig") print(f"保存完成: {output_path}") def validate_process_auto(node, interval_hours): '''自动验证脚本''' # 当前时间,取整时 current_time = datetime.datetime.now() current_time_str = current_time.strftime("%Y%m%d%H%M") hourly_time = current_time.replace(minute=0, second=0, microsecond=0) hourly_time_str = hourly_time.strftime("%Y%m%d%H%M") print(f"验证时间:{current_time_str}, (取整): {hourly_time_str}") output_dir = f"./validate/{node}" os.makedirs(output_dir, exist_ok=True) object_dir = "./predictions_0" # 检查目录是否存在 if not os.path.exists(object_dir): print(f"目录不存在: {object_dir}") return # 获取所有以 future_predictions_ 开头的 CSV 文件 csv_files = [] for file in os.listdir(object_dir): if file.startswith("future_predictions_") and file.endswith(".csv"): csv_files.append(file) if not csv_files: print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件") return # 提取时间戳并转换为 datetime 对象 file_times = [] for file in csv_files: # 提取时间戳部分:future_predictions_202601151600.csv -> 202601151600 timestamp_str = file.replace("future_predictions_", "").replace(".csv", "") try: # 将时间戳转换为 datetime 对象 file_time = datetime.datetime.strptime(timestamp_str, "%Y%m%d%H%M") file_times.append((file, file_time)) except ValueError as e: print(f"文件 {file} 的时间戳格式错误: {e}") continue if not file_times: print("没有找到有效的时间戳文件") return # 目标验证文件(当前整点减56小时: 48 + (12 - 4) = 56) target_time = hourly_time - datetime.timedelta(hours=56) target_time_str = target_time.strftime("%Y%m%d%H%M") print(f"目标验证时间: {target_time_str}") valid_files = [(f, t) for f, t in file_times if t == target_time] if not valid_files: print(f"没有找到目标对应时间 {target_time.strftime('%Y%m%d%H%M')} 的文件") return valid_file, valid_time = valid_files[0] valid_time_str = valid_time.strftime("%Y%m%d%H%M") print(f"找到符合条件的文件: {valid_file} (时间: {valid_time_str})") csv_path = os.path.join(object_dir, valid_file) # 开始验证 try: df_predict = pd.read_csv(csv_path) except Exception as e: print(f"read {csv_path} error: {str(e)}") df_predict = pd.DataFrame() if df_predict.empty: print(f"预测数据为空") return df_predict = _validate_predict_df(df_predict) timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") save_scv = f"result_validate_{node}_{valid_time_str}_{timestamp_str}.csv" output_path = os.path.join(output_dir, save_scv) df_predict.to_csv(output_path, index=False, encoding="utf-8-sig") print(f"保存完成: {output_path}") print(f"验证完成: {node} {valid_time_str}") print() def validate_process_zong(node, enable_min_batch_flag=False, min_batch_time_str=None): object_dir = "./predictions_0" output_dir = f"./validate/{node}_zong" os.makedirs(output_dir, exist_ok=True) # 检查目录是否存在 if not os.path.exists(object_dir): print(f"目录不存在: {object_dir}") return # 获取所有以 future_predictions_ 开头的 CSV 文件 csv_files = [] for file in os.listdir(object_dir): if file.startswith("future_predictions_") and file.endswith(".csv"): csv_files.append(file) if not csv_files: print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件") return csv_files.sort() list_df_will_drop = [] min_batch_dt = None if enable_min_batch_flag: if not min_batch_time_str: print("enable_min_batch_flag=True 但未提供 min_batch_time_str,退出") return min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M") min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0) # 从所有预测的文件中 for csv_file in csv_files: batch_time_str = ( csv_file.replace("future_predictions_", "").replace(".csv", "") ) batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M") batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0) # 跳过早于 min_batch_dt 的批次 if min_batch_dt is not None and batch_hour_dt < min_batch_dt: continue csv_path = os.path.join(object_dir, csv_file) try: df_predict = pd.read_csv(csv_path) except Exception as e: print(f"read {csv_path} error: {str(e)}") df_predict = pd.DataFrame() if df_predict.empty: print(f"预测数据为空: {csv_file}") continue if "will_price_drop" not in df_predict.columns: print(f"缺少 will_price_drop 字段,跳过: {csv_file}") continue df_predict_will_drop = df_predict[df_predict["will_price_drop"] == 1].copy() if df_predict_will_drop.empty: continue # df_predict_will_drop["batch_file"] = csv_file df_predict_will_drop["batch_time"] = batch_time_str list_df_will_drop.append(df_predict_will_drop) # 保存每个批次的 will_drop 数据 del df_predict if not list_df_will_drop: print("所有批次的 will_drop 都为空") return # === 1. 合并所有 will_drop 结果 === df_predict_will_drop_all = pd.concat(list_df_will_drop, ignore_index=True) # 释放临时列表内存(大列表时很有必要) del list_df_will_drop before_rows = len(df_predict_will_drop_all) # 定义“航班唯一标识”的分组键 group_keys = ["city_pair", "flight_number_1", "flight_number_2", "flight_day"] # === 2. batch_time 转为 datetime,用于时间间隔判断 === df_predict_will_drop_all["batch_dt"] = pd.to_datetime( df_predict_will_drop_all["batch_time"], format="%Y%m%d%H%M", errors="coerce", # 非法时间直接置为 NaT ) # === 3. 自动推断 batch_time 的“正常时间步长”(分钟) === diff_minutes = ( df_predict_will_drop_all["batch_dt"].dropna().sort_values().drop_duplicates().diff() .dt.total_seconds() .div(60) .dropna() ) # - 取出现频率最高的时间差作为“期望步长” 默认 60 分钟 expected_step_minutes = ( int(diff_minutes.value_counts().idxmax()) if not diff_minutes.empty else 60 ) # === 4. 按航班 + 批次时间排序,为后续连续性判断做准备 === df_predict_will_drop_all.sort_values( by=group_keys + ["batch_dt"], inplace=True, ignore_index=True, na_position="last", ) # === 5. 计算组内相邻 batch_dt 的时间间隔 === df_predict_will_drop_all["prev_batch_dt"] = df_predict_will_drop_all.groupby(group_keys)[ "batch_dt" ].shift(1) df_predict_will_drop_all["gap_minutes"] = ( (df_predict_will_drop_all["batch_dt"] - df_predict_will_drop_all["prev_batch_dt"]) .dt.total_seconds() .div(60) ) # === 6. 标记“是否是一个新的连续段” === # 新段的条件: # 1) prev_batch_dt 缺失(当前是组内第一条) # 2) batch_dt 缺失 (不常见) # 3) 与上一条的时间间隔 != 期望步长 df_predict_will_drop_all["is_new_segment"] = ( df_predict_will_drop_all["prev_batch_dt"].isna() | df_predict_will_drop_all["batch_dt"].isna() | (df_predict_will_drop_all["gap_minutes"] != expected_step_minutes) ) # === 7. 生成段号(segment_id)=== # 同一航班内,每遇到一个新段就 +1 df_predict_will_drop_all["segment_id"] = df_predict_will_drop_all.groupby(group_keys)[ "is_new_segment" ].cumsum() # === 8. 计算每个连续段的“段尾 hours_until_departure” === df_segment_last = df_predict_will_drop_all.groupby( group_keys + ["segment_id"], as_index=False ).agg(last_hours_until_departure=("hours_until_departure", "last")) # === 9. 每个连续段只保留“第一条记录”,并补上段尾信息 === df_predict_will_drop_filter = df_predict_will_drop_all.drop_duplicates( subset=group_keys + ["segment_id"], keep="first" ).merge( df_segment_last, on=group_keys + ["segment_id"], how="left", ) # === 10. 清理中间附加字段 === df_predict_will_drop_filter = ( df_predict_will_drop_filter.drop( columns=[ "batch_dt", "prev_batch_dt", "gap_minutes", "is_new_segment", "segment_id", ] ) .reset_index(drop=True) ) # === 11. 调整字段顺序(last_hours_until_departure 紧跟 price_change_percent)=== if "price_change_percent" in df_predict_will_drop_filter.columns: cols = df_predict_will_drop_filter.columns.tolist() if "last_hours_until_departure" in cols: cols.remove("last_hours_until_departure") cols.insert(cols.index("price_change_percent"), "last_hours_until_departure") df_predict_will_drop_filter = df_predict_will_drop_filter[cols] after_rows = len(df_predict_will_drop_filter) print( f"will_drop 连续段过滤完成(step={expected_step_minutes}min): {before_rows} -> {after_rows}" ) # 当前时间,取整时 current_time = datetime.datetime.now() current_time_str = current_time.strftime("%Y%m%d%H%M") hourly_time = current_time.replace(minute=0, second=0, microsecond=0) hourly_time_str = hourly_time.strftime("%Y%m%d%H%M") before_rows = len(df_predict_will_drop_filter) df_predict_will_drop_filter["valid_end_dt"] = pd.to_datetime( df_predict_will_drop_filter["valid_end_hour"], errors="coerce", ) df_predict_will_drop_filter_1 = df_predict_will_drop_filter[ (df_predict_will_drop_filter["valid_end_dt"] + pd.Timedelta(hours=8)) <= hourly_time ].copy() df_predict_will_drop_filter_1.drop(columns=["valid_end_dt"], inplace=True) after_rows = len(df_predict_will_drop_filter_1) print( f"valid_end_hour(+8h)过滤完成: {before_rows} -> {after_rows} (hourly_time={hourly_time_str})" ) # 开始验证 df_predict_will_drop_validate = _validate_predict_df(df_predict_will_drop_filter_1) timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") save_scv = f"result_validate_{node}_zong_{timestamp_str}.csv" output_path = os.path.join(output_dir, save_scv) df_predict_will_drop_validate.to_csv(output_path, index=False, encoding="utf-8-sig") print(f"保存完成: {output_path}") print(f"验证完成: {node} zong") print() if __name__ == "__main__": parser = argparse.ArgumentParser(description='验证脚本') parser.add_argument('--interval', type=int, choices=[1], default=0, help='间隔小时数(1,)') args = parser.parse_args() interval_hours = args.interval # 0 手动验证 if interval_hours == 0: # node, pred_time_str = "node0127", "202601301500" # validate_process(node, interval_hours, pred_time_str) # node = "node0127" # validate_process_zong(node) # 无条件汇总 node = "node0203" validate_process_zong(node, True, "202602041100") # 有条件汇总 # 1 自动验证 else: node = "node0127" validate_process_auto(node, interval_hours)