result_validate_0.py 8.6 KB


  1. import argparse
  2. import datetime
  3. import os
  4. import pandas as pd
  5. from data_loader import mongo_con_parse, validate_one_line, fill_hourly_crawl_date
  6. def _validate_predict_df(df_predict):
  7. client, db = mongo_con_parse()
  8. count = 0
  9. for idx, row in df_predict.iterrows():
  10. city_pair = row['city_pair']
  11. flight_day = row['flight_day']
  12. flight_number_1 = row['flight_number_1']
  13. flight_number_2 = row['flight_number_2']
  14. baggage = row['baggage']
  15. valid_begin_hour = row['valid_begin_hour']
  16. valid_begin_dt = pd.to_datetime(valid_begin_hour, format='%Y-%m-%d %H:%M:%S')
  17. # valid_end_hour = row['valid_end_hour']
  18. # valid_end_dt = pd.to_datetime(valid_end_hour, format='%Y-%m-%d %H:%M:%S')
  19. update_hour = row['update_hour']
  20. update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
  21. valid_begin_hour_modify = max(
  22. valid_begin_dt,
  23. update_dt
  24. ).strftime('%Y-%m-%d %H:%M:%S')
  25. df_val= validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
  26. # 有可能在当前验证时刻,数据库里没有在valid_begin_hour之后的数据
  27. if not df_val.empty:
  28. df_val_f = fill_hourly_crawl_date(df_val, rear_fill=2)
  29. df_val_f = df_val_f[df_val_f['is_filled']==0] # 只要原始数据,不要补齐的
  30. # df_val_f = df_val_f[df_val_f['update_hour'] <= valid_end_dt]
  31. if df_val_f.empty:
  32. drop_flag = 0
  33. first_drop_amount = pd.NA
  34. first_drop_hours = pd.NA
  35. last_hours_util = pd.NA
  36. last_update_hour = pd.NA
  37. list_change_price = []
  38. list_change_hours = []
  39. else:
  40. # 有效数据的最后一行
  41. last_row = df_val_f.iloc[-1]
  42. last_hours_util = last_row['hours_until_departure']
  43. last_update_hour = last_row['update_hour']
  44. # 价格变化过滤
  45. df_price_changes = df_val_f.loc[
  46. df_val_f["adult_total_price"].shift() != df_val_f["adult_total_price"]
  47. ].copy()
  48. # 价格变化幅度
  49. df_price_changes['change_amount'] = df_price_changes['adult_total_price'].diff().fillna(0)
  50. # 找到第一个 change_amount 小于 -5 的行
  51. first_negative_change = df_price_changes[df_price_changes['change_amount'] < -5].head(1)
  52. # 提取所需的值
  53. if not first_negative_change.empty:
  54. drop_flag = 1
  55. first_drop_amount = first_negative_change['change_amount'].iloc[0].round(2)
  56. first_drop_hours = first_negative_change['hours_until_departure'].iloc[0]
  57. else:
  58. drop_flag = 0
  59. first_drop_amount = pd.NA
  60. first_drop_hours = pd.NA
  61. list_change_price = df_price_changes['adult_total_price'].tolist()
  62. list_change_hours = df_price_changes['hours_until_departure'].tolist()
  63. else:
  64. drop_flag = 0
  65. first_drop_amount = pd.NA
  66. first_drop_hours = pd.NA
  67. last_hours_util = pd.NA
  68. last_update_hour = pd.NA
  69. list_change_price = []
  70. list_change_hours = []
  71. safe_sep = "; "
  72. df_predict.at[idx, 'change_prices'] = safe_sep.join(map(str, list_change_price))
  73. df_predict.at[idx, 'change_hours'] = safe_sep.join(map(str, list_change_hours))
  74. df_predict.at[idx, 'last_hours_util'] = last_hours_util
  75. df_predict.at[idx, 'last_update_hour'] = last_update_hour
  76. df_predict.at[idx, 'first_drop_amount'] = first_drop_amount * -1 # 负数转正数
  77. df_predict.at[idx, 'first_drop_hours'] = first_drop_hours
  78. df_predict.at[idx, 'drop_flag'] = drop_flag
  79. count += 1
  80. if count % 5 == 0:
  81. print(f"cal count: {count}")
  82. print(f"计算结束")
  83. client.close()
  84. return df_predict
  85. def validate_process(node, interval_hours, pred_time_str):
  86. '''手动验证脚本'''
  87. date = pred_time_str[4:8]
  88. output_dir = f"./validate/{node}_{date}"
  89. os.makedirs(output_dir, exist_ok=True)
  90. object_dir = "./predictions_0"
  91. csv_file = f'future_predictions_{pred_time_str}.csv'
  92. csv_path = os.path.join(object_dir, csv_file)
  93. try:
  94. df_predict = pd.read_csv(csv_path)
  95. except Exception as e:
  96. print(f"read {csv_path} error: {str(e)}")
  97. df_predict = pd.DataFrame()
  98. if df_predict.empty:
  99. print(f"预测数据为空")
  100. return
  101. df_predict = _validate_predict_df(df_predict)
  102. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  103. save_scv = f"result_validate_{node}_{pred_time_str}_{timestamp_str}.csv"
  104. output_path = os.path.join(output_dir, save_scv)
  105. df_predict.to_csv(output_path, index=False, encoding="utf-8-sig")
  106. print(f"保存完成: {output_path}")
  107. def validate_process_auto(node, interval_hours):
  108. '''自动验证脚本'''
  109. # 当前时间,取整时
  110. current_time = datetime.datetime.now()
  111. current_time_str = current_time.strftime("%Y%m%d%H%M")
  112. hourly_time = current_time.replace(minute=0, second=0, microsecond=0)
  113. hourly_time_str = hourly_time.strftime("%Y%m%d%H%M")
  114. print(f"验证时间:{current_time_str}, (取整): {hourly_time_str}")
  115. output_dir = f"./validate/{node}"
  116. os.makedirs(output_dir, exist_ok=True)
  117. object_dir = "./predictions_0"
  118. # 检查目录是否存在
  119. if not os.path.exists(object_dir):
  120. print(f"目录不存在: {object_dir}")
  121. return
  122. # 获取所有以 future_predictions_ 开头的 CSV 文件
  123. csv_files = []
  124. for file in os.listdir(object_dir):
  125. if file.startswith("future_predictions_") and file.endswith(".csv"):
  126. csv_files.append(file)
  127. if not csv_files:
  128. print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
  129. return
  130. # 提取时间戳并转换为 datetime 对象
  131. file_times = []
  132. for file in csv_files:
  133. # 提取时间戳部分:future_predictions_202601151600.csv -> 202601151600
  134. timestamp_str = file.replace("future_predictions_", "").replace(".csv", "")
  135. try:
  136. # 将时间戳转换为 datetime 对象
  137. file_time = datetime.datetime.strptime(timestamp_str, "%Y%m%d%H%M")
  138. file_times.append((file, file_time))
  139. except ValueError as e:
  140. print(f"文件 {file} 的时间戳格式错误: {e}")
  141. continue
  142. if not file_times:
  143. print("没有找到有效的时间戳文件")
  144. return
  145. # 目标验证文件(当前整点减50小时)
  146. target_time = hourly_time - datetime.timedelta(hours=50)
  147. target_time_str = target_time.strftime("%Y%m%d%H%M")
  148. print(f"目标验证时间: {target_time_str}")
  149. valid_files = [(f, t) for f, t in file_times if t == target_time]
  150. if not valid_files:
  151. print(f"没有找到目标对应时间 {target_time.strftime('%Y%m%d%H%M')} 的文件")
  152. return
  153. valid_file, valid_time = valid_files[0]
  154. valid_time_str = valid_time.strftime("%Y%m%d%H%M")
  155. print(f"找到符合条件的文件: {valid_file} (时间: {valid_time_str})")
  156. csv_path = os.path.join(object_dir, valid_file)
  157. # 开始验证
  158. try:
  159. df_predict = pd.read_csv(csv_path)
  160. except Exception as e:
  161. print(f"read {csv_path} error: {str(e)}")
  162. df_predict = pd.DataFrame()
  163. if df_predict.empty:
  164. print(f"预测数据为空")
  165. return
  166. df_predict = _validate_predict_df(df_predict)
  167. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  168. save_scv = f"result_validate_{node}_{valid_time_str}_{timestamp_str}.csv"
  169. output_path = os.path.join(output_dir, save_scv)
  170. df_predict.to_csv(output_path, index=False, encoding="utf-8-sig")
  171. print(f"保存完成: {output_path}")
  172. print(f"验证完成: {node} {valid_time_str}")
  173. print()
  174. if __name__ == "__main__":
  175. parser = argparse.ArgumentParser(description='验证脚本')
  176. parser.add_argument('--interval', type=int, choices=[1],
  177. default=0, help='间隔小时数(1,)')
  178. args = parser.parse_args()
  179. interval_hours = args.interval
  180. # 0 手动验证
  181. if interval_hours == 0:
  182. node, pred_time_str = "node0127", "202601281700"
  183. validate_process(node, interval_hours, pred_time_str)
  184. # 1 自动验证
  185. else:
  186. node = "node0122"
  187. validate_process_auto(node, interval_hours)