result_validate_0.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. import argparse
  2. import datetime
  3. import os
  4. import pandas as pd
  5. from data_loader import mongo_con_parse, validate_one_line, fill_hourly_crawl_date
  6. from config import vj_flight_route_list_hot, vj_flight_route_list_nothot, \
  7. CLEAN_VJ_HOT_NEAR_INFO_TAB, CLEAN_VJ_HOT_FAR_INFO_TAB, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  8. def _validate_predict_df(df_predict):
  9. client, db = mongo_con_parse()
  10. count = 0
  11. for idx, row in df_predict.iterrows():
  12. city_pair = row['city_pair']
  13. flight_day = row['flight_day']
  14. flight_number_1 = row['flight_number_1']
  15. flight_number_2 = row['flight_number_2']
  16. baggage = row['baggage']
  17. valid_begin_hour = row['valid_begin_hour']
  18. valid_begin_dt = pd.to_datetime(valid_begin_hour, format='%Y-%m-%d %H:%M:%S')
  19. # valid_end_hour = row['valid_end_hour']
  20. # valid_end_dt = pd.to_datetime(valid_end_hour, format='%Y-%m-%d %H:%M:%S')
  21. update_hour = row['update_hour']
  22. update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
  23. valid_begin_hour_modify = max(
  24. valid_begin_dt,
  25. update_dt
  26. ).strftime('%Y-%m-%d %H:%M:%S')
  27. if city_pair in vj_flight_route_list_hot:
  28. table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
  29. table_name_near = CLEAN_VJ_HOT_NEAR_INFO_TAB
  30. elif city_pair in vj_flight_route_list_nothot:
  31. table_name_far = CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  32. table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
  33. df_val_far = validate_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
  34. df_val_near = validate_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
  35. # 合并
  36. df_val = pd.concat([df_val_far, df_val_near]).reset_index(drop=True)
  37. entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
  38. crawl_dt = pd.to_datetime(row.get('crawl_date'), errors='coerce')
  39. batch_dt = pd.to_datetime(row.get('batch_time'), format="%Y%m%d%H%M", errors='coerce')
  40. wait_start_dt = pd.NaT
  41. wait_end_dt = pd.NaT
  42. dep_hour_dt = pd.to_datetime(row.get('from_time'), errors='coerce')
  43. if pd.notna(batch_dt):
  44. wait_start_dt = batch_dt.floor('h')
  45. if pd.notna(crawl_dt):
  46. crawl_floor = crawl_dt.floor('h')
  47. if pd.isna(wait_start_dt):
  48. wait_start_dt = crawl_floor
  49. else:
  50. wait_start_dt = max(wait_start_dt, crawl_floor) # 等待近端接近预测批次时间
  51. if pd.notna(wait_start_dt):
  52. wait_end_dt = wait_start_dt + pd.Timedelta(hours=48) # 等待窗口48小时
  53. if pd.notna(dep_hour_dt):
  54. dep_hour_dt = dep_hour_dt.floor('h')
  55. cutoff_dt = dep_hour_dt - pd.Timedelta(hours=4)
  56. if pd.notna(wait_end_dt):
  57. wait_end_dt = min(wait_end_dt, cutoff_dt) # 等待远端不能越过起飞前4小时
  58. # 有可能在当前验证时刻,数据库里没有在valid_begin_hour之后的数据
  59. if not df_val.empty:
  60. df_val_f = fill_hourly_crawl_date(df_val, rear_fill=2)
  61. df_val_f = df_val_f[df_val_f['is_filled']==0] # 只要原始数据,不要补齐的
  62. # df_val_f = df_val_f[df_val_f['update_hour'] <= valid_end_dt]
  63. if df_val_f.empty:
  64. drop_flag = 0
  65. # first_drop_amount = pd.NA
  66. first_drop_price = pd.NA
  67. first_drop_hours_until_departure = pd.NA
  68. first_drop_update_hour = pd.NA
  69. last_hours_util = pd.NA
  70. last_update_hour = pd.NA
  71. list_change_price = []
  72. list_change_hours = []
  73. drop_flag_window = 0
  74. first_lower_price = pd.NA
  75. first_lower_update_hour = pd.NA
  76. boundary_final_price = pd.NA
  77. boundary_final_update_hour = pd.NA
  78. trigger_type = pd.NA
  79. trigger_price = pd.NA
  80. trigger_update_hour = pd.NA
  81. pnl = pd.NA
  82. pnl_pct = pd.NA
  83. else:
  84. # 有效数据的最后一行
  85. last_row = df_val_f.iloc[-1]
  86. last_hours_util = last_row['hours_until_departure']
  87. last_update_hour = last_row['update_hour']
  88. df_val_f['update_hour'] = pd.to_datetime(df_val_f['update_hour'], errors='coerce')
  89. # 使用 batch_time 对齐的实际价格作为 entry_price
  90. if pd.notna(batch_dt):
  91. df_entry = df_val_f[df_val_f['update_hour'] <= batch_dt].copy()
  92. if not df_entry.empty:
  93. entry_price = df_entry.iloc[-1]['adult_total_price']
  94. df_window = df_val_f
  95. if pd.notna(wait_start_dt) and pd.notna(wait_end_dt):
  96. df_window = df_val_f[
  97. (df_val_f['update_hour'] >= wait_start_dt) &
  98. (df_val_f['update_hour'] <= wait_end_dt)
  99. ].copy() # 构建观测窗口
  100. else:
  101. df_window = df_val_f.iloc[0:0].copy() # 空切片
  102. if not df_window.empty:
  103. df_window = df_window.sort_values('update_hour')
  104. df_window_price_changes = df_window.loc[
  105. df_window["adult_total_price"].shift() != df_window["adult_total_price"]
  106. ].copy()
  107. df_window_price_changes['change_amount'] = (
  108. df_window_price_changes['adult_total_price'].diff().fillna(0)
  109. )
  110. df_first_window_negative = df_window_price_changes[
  111. df_window_price_changes['change_amount'] < -5
  112. ].head(1)
  113. drop_flag_window = 1 if not df_first_window_negative.empty else 0 # 在观测窗口中的发生降价判定
  114. else:
  115. drop_flag_window = 0
  116. first_lower_price = pd.NA
  117. first_lower_update_hour = pd.NA
  118. if not df_window.empty and pd.notna(entry_price) and pd.notna(wait_start_dt):
  119. df_lower = df_window[
  120. (df_window['update_hour'] > wait_start_dt) &
  121. (df_window['adult_total_price'] <= entry_price - 5)
  122. ].head(1)
  123. if not df_lower.empty: # 首次出现低于 entry_price - 5 的价格与时间
  124. first_lower_price = df_lower['adult_total_price'].iloc[0].round(2)
  125. first_lower_update_hour = df_lower['update_hour'].iloc[0]
  126. boundary_final_price = pd.NA
  127. boundary_final_update_hour = pd.NA
  128. if not df_window.empty: # 观测窗口远端边界的价格与时间
  129. boundary_row = df_window.iloc[-1]
  130. boundary_final_price = boundary_row['adult_total_price']
  131. boundary_final_update_hour = boundary_row['update_hour']
  132. trigger_type = pd.NA
  133. trigger_price = pd.NA
  134. trigger_update_hour = pd.NA
  135. if pd.notna(first_lower_price):
  136. trigger_type = "first_lower" # 发生降价
  137. trigger_price = first_lower_price
  138. trigger_update_hour = first_lower_update_hour
  139. elif pd.notna(boundary_final_price):
  140. trigger_type = "boundary" # 到达边界
  141. trigger_price = boundary_final_price
  142. trigger_update_hour = boundary_final_update_hour
  143. else:
  144. trigger_type = "no_data"
  145. if pd.notna(entry_price) and pd.notna(trigger_price):
  146. pnl = round(float(entry_price - trigger_price), 2) # 盈利(亏损)额度,基于entry_price
  147. if entry_price != 0:
  148. pnl_pct = round(float(pnl) / float(entry_price) * 100, 2) # 盈利(亏损)百分比,基于entry_price
  149. else:
  150. pnl_pct = pd.NA
  151. else:
  152. pnl = pd.NA
  153. pnl_pct = pd.NA
  154. # 价格变化过滤
  155. df_price_changes = df_val_f.loc[
  156. df_val_f["adult_total_price"].shift() != df_val_f["adult_total_price"]
  157. ].copy()
  158. # 价格变化幅度
  159. df_price_changes['change_amount'] = df_price_changes['adult_total_price'].diff().fillna(0)
  160. # 找到第一个 change_amount 小于 -5 的行
  161. first_negative_change = df_price_changes[df_price_changes['change_amount'] < -5].head(1)
  162. # 提取所需的值
  163. if not first_negative_change.empty:
  164. drop_flag = 1
  165. # first_drop_amount = first_negative_change['change_amount'].iloc[0].round(2)
  166. first_drop_price = first_negative_change['adult_total_price'].iloc[0].round(2)
  167. first_drop_hours_until_departure = first_negative_change['hours_until_departure'].iloc[0]
  168. first_drop_update_hour = first_negative_change['update_hour'].iloc[0]
  169. else:
  170. drop_flag = 0
  171. # first_drop_amount = pd.NA
  172. first_drop_price = pd.NA
  173. first_drop_hours_until_departure = pd.NA
  174. first_drop_update_hour = pd.NA
  175. list_change_price = df_price_changes['adult_total_price'].tolist()
  176. list_change_hours = df_price_changes['hours_until_departure'].tolist()
  177. else:
  178. drop_flag = 0
  179. # first_drop_amount = pd.NA
  180. first_drop_price = pd.NA
  181. first_drop_hours_until_departure = pd.NA
  182. first_drop_update_hour = pd.NA
  183. last_hours_util = pd.NA
  184. last_update_hour = pd.NA
  185. list_change_price = []
  186. list_change_hours = []
  187. drop_flag_window = 0
  188. first_lower_price = pd.NA
  189. first_lower_update_hour = pd.NA
  190. boundary_final_price = pd.NA
  191. boundary_final_update_hour = pd.NA
  192. trigger_type = pd.NA
  193. trigger_price = pd.NA
  194. trigger_update_hour = pd.NA
  195. pnl = pd.NA
  196. pnl_pct = pd.NA
  197. safe_sep = "; "
  198. df_predict.at[idx, 'change_prices'] = safe_sep.join(map(str, list_change_price))
  199. df_predict.at[idx, 'change_hours'] = safe_sep.join(map(str, list_change_hours))
  200. df_predict.at[idx, 'last_hours_util'] = last_hours_util
  201. df_predict.at[idx, 'last_update_hour'] = last_update_hour
  202. # df_predict.at[idx, 'first_drop_amount'] = first_drop_amount * -1 # 负数转正数
  203. df_predict.at[idx, 'first_drop_price'] = first_drop_price
  204. df_predict.at[idx, 'first_drop_hours_until_departure'] = first_drop_hours_until_departure
  205. df_predict.at[idx, 'first_drop_update_hour'] = first_drop_update_hour
  206. df_predict.at[idx, 'drop_flag'] = drop_flag
  207. df_predict.at[idx, 'wait_start_hour'] = wait_start_dt
  208. df_predict.at[idx, 'wait_end_hour'] = wait_end_dt
  209. df_predict.at[idx, 'drop_flag_window'] = drop_flag_window
  210. df_predict.at[idx, 'first_lower_price'] = first_lower_price
  211. df_predict.at[idx, 'first_lower_update_hour'] = first_lower_update_hour
  212. df_predict.at[idx, 'boundary_final_price'] = boundary_final_price
  213. df_predict.at[idx, 'boundary_final_update_hour'] = boundary_final_update_hour
  214. df_predict.at[idx, 'trigger_type'] = trigger_type
  215. df_predict.at[idx, 'trigger_price'] = trigger_price
  216. df_predict.at[idx, 'trigger_update_hour'] = trigger_update_hour
  217. df_predict.at[idx, 'pnl'] = pnl
  218. df_predict.at[idx, 'pnl_pct'] = pnl_pct
  219. count += 1
  220. if count % 5 == 0:
  221. print(f"cal count: {count}")
  222. print(f"计算结束")
  223. client.close()
  224. return df_predict
  225. def validate_process(node, interval_hours, pred_time_str):
  226. '''手动验证脚本'''
  227. date = pred_time_str[4:8]
  228. output_dir = f"./validate/{node}_{date}"
  229. os.makedirs(output_dir, exist_ok=True)
  230. object_dir = "./predictions_0"
  231. csv_file = f'future_predictions_{pred_time_str}.csv'
  232. csv_path = os.path.join(object_dir, csv_file)
  233. try:
  234. df_predict = pd.read_csv(csv_path)
  235. except Exception as e:
  236. print(f"read {csv_path} error: {str(e)}")
  237. df_predict = pd.DataFrame()
  238. if df_predict.empty:
  239. print(f"预测数据为空")
  240. return
  241. df_predict = _validate_predict_df(df_predict)
  242. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  243. save_scv = f"result_validate_{node}_{pred_time_str}_{timestamp_str}.csv"
  244. output_path = os.path.join(output_dir, save_scv)
  245. df_predict.to_csv(output_path, index=False, encoding="utf-8-sig")
  246. print(f"保存完成: {output_path}")
  247. def validate_process_auto(node, interval_hours):
  248. '''自动验证脚本'''
  249. # 当前时间,取整时
  250. current_time = datetime.datetime.now()
  251. current_time_str = current_time.strftime("%Y%m%d%H%M")
  252. hourly_time = current_time.replace(minute=0, second=0, microsecond=0)
  253. hourly_time_str = hourly_time.strftime("%Y%m%d%H%M")
  254. print(f"验证时间:{current_time_str}, (取整): {hourly_time_str}")
  255. output_dir = f"./validate/{node}"
  256. os.makedirs(output_dir, exist_ok=True)
  257. object_dir = "./predictions_0"
  258. # 检查目录是否存在
  259. if not os.path.exists(object_dir):
  260. print(f"目录不存在: {object_dir}")
  261. return
  262. # 获取所有以 future_predictions_ 开头的 CSV 文件
  263. csv_files = []
  264. for file in os.listdir(object_dir):
  265. if file.startswith("future_predictions_") and file.endswith(".csv"):
  266. csv_files.append(file)
  267. if not csv_files:
  268. print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
  269. return
  270. # 提取时间戳并转换为 datetime 对象
  271. file_times = []
  272. for file in csv_files:
  273. # 提取时间戳部分:future_predictions_202601151600.csv -> 202601151600
  274. timestamp_str = file.replace("future_predictions_", "").replace(".csv", "")
  275. try:
  276. # 将时间戳转换为 datetime 对象
  277. file_time = datetime.datetime.strptime(timestamp_str, "%Y%m%d%H%M")
  278. file_times.append((file, file_time))
  279. except ValueError as e:
  280. print(f"文件 {file} 的时间戳格式错误: {e}")
  281. continue
  282. if not file_times:
  283. print("没有找到有效的时间戳文件")
  284. return
  285. # 目标验证文件(当前整点减56小时)
  286. target_time = hourly_time - datetime.timedelta(hours=56)
  287. target_time_str = target_time.strftime("%Y%m%d%H%M")
  288. print(f"目标验证时间: {target_time_str}")
  289. valid_files = [(f, t) for f, t in file_times if t == target_time]
  290. if not valid_files:
  291. print(f"没有找到目标对应时间 {target_time.strftime('%Y%m%d%H%M')} 的文件")
  292. return
  293. valid_file, valid_time = valid_files[0]
  294. valid_time_str = valid_time.strftime("%Y%m%d%H%M")
  295. print(f"找到符合条件的文件: {valid_file} (时间: {valid_time_str})")
  296. csv_path = os.path.join(object_dir, valid_file)
  297. # 开始验证
  298. try:
  299. df_predict = pd.read_csv(csv_path)
  300. except Exception as e:
  301. print(f"read {csv_path} error: {str(e)}")
  302. df_predict = pd.DataFrame()
  303. if df_predict.empty:
  304. print(f"预测数据为空")
  305. return
  306. df_predict = _validate_predict_df(df_predict)
  307. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  308. save_scv = f"result_validate_{node}_{valid_time_str}_{timestamp_str}.csv"
  309. output_path = os.path.join(output_dir, save_scv)
  310. df_predict.to_csv(output_path, index=False, encoding="utf-8-sig")
  311. print(f"保存完成: {output_path}")
  312. print(f"验证完成: {node} {valid_time_str}")
  313. print()
  314. def validate_process_zong(node, enable_min_max_batch_flag=False, min_batch_time_str=None, max_batch_time_str=None):
  315. object_dir = "./predictions_0"
  316. output_dir = f"./validate/{node}_zong"
  317. os.makedirs(output_dir, exist_ok=True)
  318. # 检查目录是否存在
  319. if not os.path.exists(object_dir):
  320. print(f"目录不存在: {object_dir}")
  321. return
  322. # 获取所有以 future_predictions_ 开头的 CSV 文件
  323. csv_files = []
  324. for file in os.listdir(object_dir):
  325. if file.startswith("future_predictions_") and file.endswith(".csv"):
  326. csv_files.append(file)
  327. if not csv_files:
  328. print(f"在 {object_dir} 中没有找到 future_predictions_ 开头的 CSV 文件")
  329. return
  330. csv_files.sort()
  331. list_df_will_drop = []
  332. min_batch_dt = None
  333. max_batch_dt = None
  334. if enable_min_max_batch_flag:
  335. if not min_batch_time_str and not max_batch_time_str:
  336. print("enable_min_max_batch_flag=True 但未提供 min_batch_time_str/max_batch_time_str,退出")
  337. return
  338. if min_batch_time_str:
  339. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  340. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  341. if max_batch_time_str:
  342. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  343. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  344. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  345. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  346. return
  347. # 从所有预测的文件中
  348. for csv_file in csv_files:
  349. batch_time_str = (
  350. csv_file.replace("future_predictions_", "").replace(".csv", "")
  351. )
  352. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M")
  353. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  354. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  355. continue
  356. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  357. continue
  358. csv_path = os.path.join(object_dir, csv_file)
  359. try:
  360. df_predict = pd.read_csv(csv_path)
  361. except Exception as e:
  362. print(f"read {csv_path} error: {str(e)}")
  363. df_predict = pd.DataFrame()
  364. if df_predict.empty:
  365. print(f"预测数据为空: {csv_file}")
  366. continue
  367. if "will_price_drop" not in df_predict.columns:
  368. print(f"缺少 will_price_drop 字段,跳过: {csv_file}")
  369. continue
  370. df_predict_will_drop = df_predict[df_predict["will_price_drop"] == 1].copy()
  371. if df_predict_will_drop.empty:
  372. continue
  373. # df_predict_will_drop["batch_file"] = csv_file
  374. df_predict_will_drop["batch_time"] = batch_time_str
  375. list_df_will_drop.append(df_predict_will_drop) # 保存每个批次的 will_drop 数据
  376. del df_predict
  377. if not list_df_will_drop:
  378. print("所有批次的 will_drop 都为空")
  379. return
  380. # === 1. 合并所有 will_drop 结果 ===
  381. df_predict_will_drop_all = pd.concat(list_df_will_drop, ignore_index=True)
  382. # 释放临时列表内存(大列表时很有必要)
  383. del list_df_will_drop
  384. before_rows = len(df_predict_will_drop_all)
  385. # 定义“航班唯一标识”的分组键
  386. group_keys = ["city_pair", "flight_number_1", "flight_number_2", "flight_day"]
  387. # === 2. batch_time 转为 datetime,用于时间间隔判断 ===
  388. df_predict_will_drop_all["batch_dt"] = pd.to_datetime(
  389. df_predict_will_drop_all["batch_time"],
  390. format="%Y%m%d%H%M",
  391. errors="coerce", # 非法时间直接置为 NaT
  392. )
  393. # === 3. 自动推断 batch_time 的“正常时间步长”(分钟) ===
  394. diff_minutes = (
  395. df_predict_will_drop_all["batch_dt"].dropna().sort_values().drop_duplicates().diff()
  396. .dt.total_seconds()
  397. .div(60)
  398. .dropna()
  399. )
  400. # - 取出现频率最高的时间差作为“期望步长” 默认 60 分钟
  401. expected_step_minutes = (
  402. int(diff_minutes.value_counts().idxmax()) if not diff_minutes.empty else 60
  403. )
  404. # === 4. 按航班 + 批次时间排序,为后续连续性判断做准备 ===
  405. df_predict_will_drop_all.sort_values(
  406. by=group_keys + ["batch_dt"],
  407. inplace=True,
  408. ignore_index=True,
  409. na_position="last",
  410. )
  411. # === 5. 计算组内相邻 batch_dt 的时间间隔 ===
  412. df_predict_will_drop_all["prev_batch_dt"] = df_predict_will_drop_all.groupby(group_keys)[
  413. "batch_dt"
  414. ].shift(1)
  415. df_predict_will_drop_all["gap_minutes"] = (
  416. (df_predict_will_drop_all["batch_dt"] - df_predict_will_drop_all["prev_batch_dt"])
  417. .dt.total_seconds()
  418. .div(60)
  419. )
  420. # === 6. 标记“是否是一个新的连续段” ===
  421. # 新段的条件:
  422. # 1) prev_batch_dt 缺失(当前是组内第一条)
  423. # 2) batch_dt 缺失 (不常见)
  424. # 3) 与上一条的时间间隔 != 期望步长
  425. df_predict_will_drop_all["is_new_segment"] = (
  426. df_predict_will_drop_all["prev_batch_dt"].isna()
  427. | df_predict_will_drop_all["batch_dt"].isna()
  428. | (df_predict_will_drop_all["gap_minutes"] != expected_step_minutes)
  429. )
  430. # === 7. 生成段号(segment_id)===
  431. # 同一航班内,每遇到一个新段就 +1
  432. df_predict_will_drop_all["segment_id"] = df_predict_will_drop_all.groupby(group_keys)[
  433. "is_new_segment"
  434. ].cumsum()
  435. # === 8. 计算每个连续段的“段尾 hours_until_departure” ===
  436. df_segment_last = df_predict_will_drop_all.groupby(
  437. group_keys + ["segment_id"], as_index=False
  438. ).agg(last_hours_until_departure=("hours_until_departure", "last"))
  439. # === 9. 每个连续段只保留“第一条记录”,并补上段尾信息 ===
  440. df_predict_will_drop_filter = df_predict_will_drop_all.drop_duplicates(
  441. subset=group_keys + ["segment_id"], keep="first"
  442. ).merge(
  443. df_segment_last,
  444. on=group_keys + ["segment_id"],
  445. how="left",
  446. )
  447. # === 10. 清理中间附加字段 ===
  448. df_predict_will_drop_filter = (
  449. df_predict_will_drop_filter.drop(
  450. columns=[
  451. "batch_dt",
  452. "prev_batch_dt",
  453. "gap_minutes",
  454. "is_new_segment",
  455. "segment_id",
  456. ]
  457. )
  458. .reset_index(drop=True)
  459. )
  460. # === 11. 调整字段顺序(last_hours_until_departure 紧跟 price_change_percent)===
  461. if "price_change_percent" in df_predict_will_drop_filter.columns:
  462. cols = df_predict_will_drop_filter.columns.tolist()
  463. if "last_hours_until_departure" in cols:
  464. cols.remove("last_hours_until_departure")
  465. cols.insert(cols.index("price_change_percent"), "last_hours_until_departure")
  466. df_predict_will_drop_filter = df_predict_will_drop_filter[cols]
  467. after_rows = len(df_predict_will_drop_filter)
  468. print(
  469. f"will_drop 连续段过滤完成(step={expected_step_minutes}min): {before_rows} -> {after_rows}"
  470. )
  471. # 当前时间,取整时
  472. current_time = datetime.datetime.now()
  473. current_time_str = current_time.strftime("%Y%m%d%H%M")
  474. hourly_time = current_time.replace(minute=0, second=0, microsecond=0)
  475. hourly_time_str = hourly_time.strftime("%Y%m%d%H%M")
  476. before_rows = len(df_predict_will_drop_filter)
  477. df_predict_will_drop_filter["valid_end_dt"] = pd.to_datetime(
  478. df_predict_will_drop_filter["valid_end_hour"],
  479. errors="coerce",
  480. )
  481. df_predict_will_drop_filter_1 = df_predict_will_drop_filter[
  482. (df_predict_will_drop_filter["valid_end_dt"] + pd.Timedelta(hours=8))
  483. <= hourly_time
  484. ].copy()
  485. df_predict_will_drop_filter_1.drop(columns=["valid_end_dt"], inplace=True)
  486. after_rows = len(df_predict_will_drop_filter_1)
  487. print(
  488. f"valid_end_hour(+8h)过滤完成: {before_rows} -> {after_rows} (hourly_time={hourly_time_str})"
  489. )
  490. # 开始验证
  491. df_predict_will_drop_validate = _validate_predict_df(df_predict_will_drop_filter_1)
  492. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  493. save_scv = f"result_validate_{node}_zong_{timestamp_str}.csv"
  494. output_path = os.path.join(output_dir, save_scv)
  495. df_predict_will_drop_validate.to_csv(output_path, index=False, encoding="utf-8-sig")
  496. print(f"保存完成: {output_path}")
  497. print(f"验证完成: {node} zong")
  498. print()
  499. if __name__ == "__main__":
  500. parser = argparse.ArgumentParser(description='验证脚本')
  501. parser.add_argument('--interval', type=int, choices=[1],
  502. default=0, help='间隔小时数(1,)')
  503. args = parser.parse_args()
  504. interval_hours = args.interval
  505. # 0 手动验证
  506. if interval_hours == 0:
  507. # node, pred_time_str = "node0127", "202601301500"
  508. # validate_process(node, interval_hours, pred_time_str)
  509. # node = "node0127"
  510. # validate_process_zong(node) # 无条件汇总
  511. # node = "node0127"
  512. # validate_process_zong(node, True, None, "202602051400") # 有条件汇总
  513. # node = "node0203"
  514. # validate_process_zong(node, True, "202602041100", "202602051400") # 有条件汇总
  515. # node = "node0205"
  516. # validate_process_zong(node, True, "202602061000", "202602091000") # 有条件汇总
  517. # node = "node0211"
  518. # validate_process_zong(node, True, "202602111100", None) # 202602111100 -> 202602161000 202602161100 -> 202602211000
  519. # node = "node0224"
  520. # validate_process_zong(node, True, "202602241600", None) # 202602241600 -> 202602271400 202602271500 202602281700
  521. node = "node0311"
  522. validate_process_zong(node, True, "202603121800", None)
  523. # 1 自动验证
  524. else:
  525. node = "node0127"
  526. validate_process_auto(node, interval_hours)