data_process.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. import pandas as pd
  2. import numpy as np
  3. import gc
  4. import os
  5. def preprocess_data_simple(df_input, is_train=False, hourly_time=None):
  6. print(">>> 开始数据预处理")
  7. # 城市码映射成数字(不用)
  8. # 更新日期是周几
  9. df_input['update_week'] = df_input['update_hour'].dt.dayofweek + 1
  10. # gid:基于指定字段的分组标记(整数)
  11. df_input['gid'] = (
  12. df_input
  13. .groupby(
  14. ['citypair', 'flight_numbers', 'from_date'], # 'baggage_weight' 先不进分组
  15. sort=False
  16. )
  17. .ngroup()
  18. )
  19. # 在 gid 与 baggage_weight 内按时间降序
  20. df_input = df_input.sort_values(
  21. by=['gid', 'baggage_weight', 'hours_until_departure'],
  22. ascending=[True, True, False]
  23. ).reset_index(drop=True)
  24. df_input = df_input[df_input['hours_until_departure'] <= 480]
  25. df_input = df_input[df_input['baggage_weight'] == 0] # 先保留0公斤行李的
  26. # 在hours_until_departure 的末尾 保留到当前时刻的数据
  27. if not is_train:
  28. df_input = df_input[df_input['update_hour'] <= hourly_time].copy()
  29. else:
  30. df_input = df_input.copy() # 训练集也 copy 一下保持一致性
  31. df_input = df_input.reset_index(drop=True)
  32. # 价格变化最小量阈值
  33. price_change_amount_threshold = 1
  34. df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].diff()
  35. # 计算价格变化量
  36. df_input['price_change_amount'] = (
  37. df_input['_raw_price_diff']
  38. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  39. .replace(0, np.nan)
  40. .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
  41. .ffill()
  42. .fillna(0)
  43. .round(2)
  44. )
  45. # 计算价格变化百分比(相对于上一时间点的变化率)
  46. df_input['price_change_percent'] = (
  47. df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total']
  48. .pct_change()
  49. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  50. .replace(0, np.nan)
  51. .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
  52. .ffill()
  53. .fillna(0)
  54. .round(4)
  55. )
  56. # 第一步:标记价格变化段(按“是否发生新的实际变价事件”切段)
  57. # 这样即使连续两次变价金额相同(如 -50, -50),也会分到不同段
  58. _price_change_event = df_input['_raw_price_diff'].abs().ge(price_change_amount_threshold)
  59. df_input['price_change_segment'] = (
  60. _price_change_event
  61. .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
  62. .cumsum()
  63. )
  64. # 第二步:计算每个变化段内的持续时间
  65. df_input['price_duration_hours'] = (
  66. df_input.groupby(['gid', 'baggage_weight', 'price_change_segment'], group_keys=False)
  67. .cumcount()
  68. .add(1)
  69. )
  70. # 可选:删除临时列
  71. df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff'])
  72. # 训练过程
  73. if is_train:
  74. df_target = df_input[(df_input['hours_until_departure'] >= 48) & (df_input['hours_until_departure'] <= 384)].copy()
  75. df_target = df_target.sort_values(
  76. by=['gid', 'baggage_weight', 'hours_until_departure'],
  77. ascending=[True, True, False]
  78. ).reset_index(drop=True)
  79. # 每条对应的前一条记录
  80. prev_pct = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_percent'].shift(1)
  81. prev_amo = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'].shift(1)
  82. prev_dur = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_duration_hours'].shift(1)
  83. prev_price = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].shift(1)
  84. prev_cabin = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['cabins'].shift(1)
  85. # 对于先升后降(先降再降)的分析
  86. seg_start_mask = df_target['price_duration_hours'].eq(1) # 开始变价节点
  87. # 正例库
  88. prev_pct_num = pd.to_numeric(prev_pct, errors='coerce')
  89. drop_mask = (
  90. seg_start_mask
  91. & prev_pct_num.notna()
  92. & (df_target['price_change_percent'] < 0)
  93. # & prev_dur.le(24) # 仅保留24小时内发生的降价:上一价格段持续时长需<=24h
  94. )
  95. df_drop_nodes = df_target.loc[drop_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', 'cabins']].copy()
  96. df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
  97. df_drop_nodes.rename(columns={'days_to_departure': 'drop_days_to_departure'}, inplace=True)
  98. df_drop_nodes.rename(columns={'update_hour': 'drop_update_hour'}, inplace=True)
  99. df_drop_nodes.rename(columns={'update_week': 'drop_update_week'}, inplace=True)
  100. df_drop_nodes.rename(columns={'cabins': 'drop_cabins'}, inplace=True)
  101. df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  102. df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  103. df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
  104. df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
  105. df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
  106. df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy()
  107. df_drop_nodes['high_price_cabins'] = prev_cabin.loc[drop_mask].astype(str)
  108. df_drop_nodes = df_drop_nodes.reset_index(drop=True)
  109. flight_info_cols = [
  110. 'gid', 'baggage_weight', 'citypair', 'flight_numbers', 'from_time', 'from_date', 'currency',
  111. ]
  112. flight_info_cols = [c for c in flight_info_cols if c in df_target.columns]
  113. df_gid_info = df_target[flight_info_cols].drop_duplicates(subset=['gid', 'baggage_weight']).reset_index(drop=True)
  114. df_drop_nodes = df_drop_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
  115. drop_info_cols = [
  116. 'drop_update_hour', 'drop_update_week', 'drop_cabins',
  117. 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  118. 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'high_price_cabins',
  119. ]
  120. # 按顺序排列 保留gid
  121. df_drop_nodes = df_drop_nodes[flight_info_cols + drop_info_cols]
  122. df_drop_nodes['start_hours_until_departure'] = (df_drop_nodes['drop_hours_until_departure'] + df_drop_nodes['high_price_duration_hours']).round().astype('Int64')
  123. df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_hours_until_departure'] <= 360]
  124. df_drop_nodes = df_drop_nodes[df_drop_nodes['start_hours_until_departure'] >= 72]
  125. df_drop_nodes = df_drop_nodes[df_drop_nodes['high_price_duration_hours'] > 2.0] # 维持时间太短的不计
  126. df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_price_change_amount'].abs() > 1] # 1¥之内的降价不计
  127. # 反例库:所有升价节点
  128. # seg_start_mask = df_target['price_duration_hours'].eq(1)
  129. # rise_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] > 0)
  130. prev_pct_num = pd.to_numeric(prev_pct, errors='coerce')
  131. valid_mask = seg_start_mask & prev_pct_num.notna()
  132. curr_pct = pd.to_numeric(df_target['price_change_percent'], errors='coerce')
  133. # prev_dur_num = pd.to_numeric(prev_dur, errors='coerce')
  134. pos_case_mask = curr_pct.ge(0)
  135. # neg_case_mask = curr_pct.lt(0) & prev_dur_num.gt(24)
  136. rise_mask = valid_mask & pos_case_mask # (pos_case_mask | neg_case_mask)
  137. df_rise_nodes = df_target.loc[rise_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', 'cabins']].copy()
  138. df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True)
  139. df_rise_nodes.rename(columns={'days_to_departure': 'rise_days_to_departure'}, inplace=True)
  140. df_rise_nodes.rename(columns={'update_hour': 'rise_update_hour'}, inplace=True)
  141. df_rise_nodes.rename(columns={'update_week': 'rise_update_week'}, inplace=True)
  142. df_rise_nodes.rename(columns={'cabins': 'rise_cabins'}, inplace=True)
  143. df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  144. df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  145. df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy()
  146. df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy()
  147. df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy()
  148. df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy()
  149. df_rise_nodes['prev_rise_cabins'] = prev_cabin.loc[rise_mask].astype(str)
  150. df_rise_nodes = df_rise_nodes.reset_index(drop=True)
  151. df_rise_nodes = df_rise_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
  152. rise_info_cols = [
  153. 'rise_update_hour', 'rise_update_week', 'rise_cabins',
  154. 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
  155. 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_cabins'
  156. ]
  157. df_rise_nodes = df_rise_nodes[flight_info_cols + rise_info_cols]
  158. df_rise_nodes['start_hours_until_departure'] = (df_rise_nodes['rise_hours_until_departure'] + df_rise_nodes['prev_rise_duration_hours']).round().astype('Int64')
  159. df_rise_nodes = df_rise_nodes[df_rise_nodes['rise_hours_until_departure'] <= 360]
  160. df_rise_nodes = df_rise_nodes[df_rise_nodes['start_hours_until_departure'] >= 72]
  161. df_rise_nodes = df_rise_nodes[df_rise_nodes['prev_rise_duration_hours'] > 2.0] # 维持时间太短的不计
  162. df_rise_nodes = df_rise_nodes[df_rise_nodes['rise_price_change_amount'].abs() > 1] # 1¥之内的改变不计
  163. # 制作历史包络线
  164. envelope_group = ['citypair', 'flight_numbers', 'from_date', 'baggage_weight']
  165. idx_peak = df_target.groupby(envelope_group)['price_total'].idxmax()
  166. df_envelope = df_target.loc[idx_peak, envelope_group + [
  167. 'from_time', 'price_total', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week',
  168. ]].rename(columns={
  169. 'price_total': 'peak_price',
  170. 'hours_until_departure': 'peak_hours',
  171. 'days_to_departure': 'peak_days',
  172. 'update_hour': 'peak_time',
  173. 'update_week': 'peak_week',
  174. }).reset_index(drop=True)
  175. del df_gid_info
  176. del df_target
  177. return df_input, df_drop_nodes, df_rise_nodes, df_envelope
  178. return df_input, None, None, None
  179. def predict_data_simple(df_input, city_pair, object_dir, predict_dir=".", pred_time_str=""):
  180. if df_input is None or df_input.empty:
  181. return pd.DataFrame()
  182. df_sorted = df_input.sort_values(
  183. by=['gid', 'baggage_weight', 'hours_until_departure'],
  184. ascending=[True, True, False],
  185. ).reset_index(drop=True)
  186. df_sorted = df_sorted[
  187. df_sorted['hours_until_departure'].between(72, 360)
  188. ].reset_index(drop=True)
  189. # 每个 gid baggage_weight 取 hours_until_departure 最小的一条 (当前小时)
  190. df_min_hours = (
  191. df_sorted.drop_duplicates(subset=['gid', 'baggage_weight'], keep='last')
  192. .reset_index(drop=True)
  193. )
  194. # 余票不能太少
  195. df_min_hours = df_min_hours[(df_min_hours['ticket_amount'] >= 2)].reset_index(drop=True)
  196. # 读历史降价场景
  197. drop_info_csv_path = os.path.join(object_dir, f'{city_pair}_drop_info.csv')
  198. if os.path.exists(drop_info_csv_path):
  199. df_drop_nodes = pd.read_csv(drop_info_csv_path)
  200. else:
  201. df_drop_nodes = pd.DataFrame()
  202. # 读历史升价场景
  203. rise_info_csv_path = os.path.join(object_dir, f'{city_pair}_rise_info.csv')
  204. if os.path.exists(rise_info_csv_path):
  205. df_rise_nodes = pd.read_csv(rise_info_csv_path)
  206. else:
  207. df_rise_nodes = pd.DataFrame()
  208. # 联合价格分布(按 flight_key 分组计算)==========================================================
  209. flight_key = ['citypair', 'flight_numbers', 'baggage_weight']
  210. group_cols = [
  211. c for c in flight_key
  212. if (
  213. c in df_min_hours.columns
  214. or (not df_drop_nodes.empty and c in df_drop_nodes.columns)
  215. or (not df_rise_nodes.empty and c in df_rise_nodes.columns)
  216. )
  217. ]
  218. # 统一初始化
  219. df_min_hours['relative_position'] = np.nan
  220. if not df_drop_nodes.empty:
  221. df_drop_nodes['relative_position'] = np.nan
  222. if not df_rise_nodes.empty:
  223. df_rise_nodes['relative_position'] = np.nan
  224. parts = []
  225. # 当前待预测
  226. if not df_min_hours.empty and 'price_total' in df_min_hours.columns:
  227. min_group_cols = [c for c in group_cols if c in df_min_hours.columns]
  228. cur = df_min_hours[min_group_cols + ['from_date', 'price_total']].copy()
  229. for c in group_cols:
  230. if c not in cur.columns:
  231. cur[c] = np.nan
  232. cur['price'] = pd.to_numeric(cur['price_total'], errors='coerce')
  233. cur['source'] = 'min'
  234. cur['row_id'] = cur.index
  235. parts.append(cur[group_cols + ['from_date', 'price', 'source', 'row_id']])
  236. # 历史降价
  237. if not df_drop_nodes.empty and 'high_price_amount' in df_drop_nodes.columns:
  238. drop_group_cols = [c for c in group_cols if c in df_drop_nodes.columns]
  239. drop = df_drop_nodes[drop_group_cols + ['from_date', 'high_price_amount']].copy()
  240. for c in group_cols:
  241. if c not in drop.columns:
  242. drop[c] = np.nan
  243. drop['price'] = pd.to_numeric(drop['high_price_amount'], errors='coerce')
  244. drop['source'] = 'drop'
  245. drop['row_id'] = drop.index
  246. parts.append(drop[group_cols + ['from_date', 'price', 'source', 'row_id']])
  247. # 历史升价
  248. if not df_rise_nodes.empty and 'prev_rise_amount' in df_rise_nodes.columns:
  249. rise_group_cols = [c for c in group_cols if c in df_rise_nodes.columns]
  250. rise = df_rise_nodes[rise_group_cols + ['from_date', 'prev_rise_amount']].copy()
  251. for c in group_cols:
  252. if c not in rise.columns:
  253. rise[c] = np.nan
  254. rise['price'] = pd.to_numeric(rise['prev_rise_amount'], errors='coerce')
  255. rise['source'] = 'rise'
  256. rise['row_id'] = rise.index
  257. parts.append(rise[group_cols + ['from_date', 'price', 'source', 'row_id']])
  258. if parts:
  259. all_prices = pd.concat(parts, ignore_index=True)
  260. all_prices = all_prices.dropna(subset=['price']).reset_index(drop=True)
  261. # 计算价格百分位(优先按分组计算,无法分组时回退全局)
  262. if group_cols:
  263. all_prices['dense_rank'] = all_prices.groupby(group_cols, dropna=False)['price'].rank(method='dense')
  264. all_prices['max_rank'] = all_prices.groupby(group_cols, dropna=False)['dense_rank'].transform('max')
  265. all_prices['relative_position'] = np.where(
  266. all_prices['max_rank'] > 1,
  267. (all_prices['dense_rank'] - 1) / (all_prices['max_rank'] - 1),
  268. 1.0,
  269. )
  270. all_prices = all_prices.drop(columns=['dense_rank', 'max_rank'])
  271. else:
  272. dense_rank = all_prices['price'].rank(method='dense')
  273. max_rank = dense_rank.max()
  274. if pd.notna(max_rank) and max_rank > 1:
  275. all_prices['relative_position'] = (dense_rank - 1) / (max_rank - 1)
  276. else:
  277. all_prices['relative_position'] = 1.0
  278. all_prices['relative_position'] = all_prices['relative_position'].round(4)
  279. # 回填到三个表
  280. m = all_prices['source'] == 'min'
  281. df_min_hours.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  282. if not df_drop_nodes.empty:
  283. m = all_prices['source'] == 'drop'
  284. df_drop_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  285. if not df_rise_nodes.empty:
  286. m = all_prices['source'] == 'rise'
  287. df_rise_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  288. # ====================================================================================================
  289. # print(">>> 构建跨航班日价格包络线")
  290. # flight_key = ['citypair', 'flight_numbers', 'baggage_weight']
  291. # day_key = flight_key + ['from_date']
  292. # # 1. 历史侧:加载训练阶段的峰值数据
  293. # envelope_csv_path = os.path.join(object_dir, f'{city_pair}_envelope_info.csv')
  294. # if os.path.exists(envelope_csv_path):
  295. # df_hist = pd.read_csv(envelope_csv_path)
  296. # df_hist = df_hist[day_key + ['peak_price', 'peak_hours']]
  297. # df_hist['source'] = 'hist'
  298. # else:
  299. # df_hist = pd.DataFrame()
  300. # # 2. 未来侧:当前在售价格
  301. # # df_future = df_min_hours[day_key + ['price_total', 'hours_until_departure']].copy().rename(
  302. # # columns={'price_total': 'peak_price', 'hours_until_departure': 'peak_hours'}
  303. # # )
  304. # # df_future['source'] = 'future'
  305. # df_future = pd.DataFrame()
  306. # # 3. 合并包络线数据点
  307. # df_envelope_all = pd.concat(
  308. # [x for x in [df_hist, df_future] if not x.empty], ignore_index=True
  309. # ).drop_duplicates(subset=day_key, keep='last')
  310. # # 4. 包络线统计 + 找高点起飞日
  311. # df_envelope_agg = df_envelope_all.groupby(flight_key).agg(
  312. # envelope_max=('peak_price', 'max'), # 峰值最大
  313. # envelope_min=('peak_price', 'min'), # 峰值最小
  314. # envelope_mean=('peak_price', 'mean'), # 峰值平均
  315. # envelope_count=('peak_price', 'count'), # 峰值统计总数
  316. # envelope_avg_peak_hours=('peak_hours', 'mean'), # 峰值发生的距离起飞小时数, 做一下平均
  317. # ).reset_index()
  318. # # 对数值列保留两位小数
  319. # df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']] = df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']].round(2)
  320. # idx_top = df_envelope_all.groupby(flight_key)['peak_price'].idxmax()
  321. # df_top = df_envelope_all.loc[idx_top, flight_key + ['from_date', 'peak_price', 'peak_hours']].rename(
  322. # columns={'from_date': 'target_flight_day', 'peak_price': 'target_price', 'peak_hours': 'target_peak_hours'}
  323. # )
  324. # df_envelope_agg = df_envelope_agg.merge(df_top, on=flight_key, how='left')
  325. # # 5. 合并到 df_min_hours
  326. # df_min_hours = df_min_hours.merge(df_envelope_agg, on=flight_key, how='left')
  327. # price_range = (df_min_hours['envelope_max'] - df_min_hours['envelope_min']).replace(0, 1) # 计算当前价格在包络区间的百分位
  328. # df_min_hours['envelope_position'] = (
  329. # (df_min_hours['price_total'] - df_min_hours['envelope_min']) / price_range
  330. # ).clip(0, 1).round(4)
  331. # # df_min_hours['is_envelope_peak'] = (df_min_hours['envelope_position'] >= 0.75).astype(int) # 0.95 -> 0.75
  332. # df_min_hours['is_target_day'] = (df_min_hours['from_date'] == df_min_hours['target_flight_day']).astype(int)
  333. # 综合评分阈值:大于阈值的都认为值得投放
  334. relative_position_threshold = 0.4
  335. df_min_hours['is_good_target'] = (df_min_hours['relative_position'] >= relative_position_threshold).astype(int)
  336. total_cnt_before = len(df_min_hours) # 记录下过滤前的总数
  337. df_min_hours = df_min_hours[(df_min_hours['is_good_target'] == 1)].reset_index(drop=True) # 保留值得投放的
  338. total_cnt_after = len(df_min_hours) # 记录下过滤后的总数
  339. # =====================================================================
  340. df_min_hours['simple_will_price_drop'] = 0
  341. # df_min_hours['simple_drop_in_hours'] = 0
  342. df_min_hours['simple_drop_in_hours_prob'] = 0.0
  343. df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知
  344. df_min_hours['flag_dist'] = ''
  345. df_min_hours['drop_price_change_upper'] = 0.0
  346. df_min_hours['drop_price_change_lower'] = 0.0
  347. df_min_hours['drop_price_sample_size'] = 0
  348. df_min_hours['rise_price_change_upper'] = 0.0
  349. df_min_hours['rise_price_change_lower'] = 0.0
  350. df_min_hours['rise_price_sample_size'] = 0
  351. # 这个阈值取多少?
  352. pct_threshold = 0.2
  353. pct_threshold_1 = 0.2
  354. for idx, row in df_min_hours.iterrows():
  355. city_pair = row['citypair']
  356. flight_numbers = row['flight_numbers']
  357. baggage_weight = row['baggage_weight']
  358. from_date = row['from_date']
  359. if flight_numbers == "UO235" and from_date == "2026-04-25": # 调试时用
  360. pass
  361. days_to_departure = row['days_to_departure']
  362. hours_until_departure = row['hours_until_departure']
  363. price_change_percent = row['price_change_percent']
  364. price_change_amount = row['price_change_amount']
  365. price_duration_hours = row['price_duration_hours']
  366. price_amount = row['price_total']
  367. cabins = row['cabins']
  368. length_drop = 0
  369. length_rise = 0
  370. # 针对历史上发生的 >降价
  371. if not df_drop_nodes.empty:
  372. # 对准航线 航班号 行李配额
  373. df_drop_nodes_part = df_drop_nodes[
  374. (df_drop_nodes['citypair'] == city_pair) &
  375. (df_drop_nodes['flight_numbers'] == flight_numbers) &
  376. (df_drop_nodes['baggage_weight'] == baggage_weight)
  377. ]
  378. # 降价前 增量阈值、当前阈值 的匹配
  379. if not df_drop_nodes_part.empty and pd.notna(price_change_percent):
  380. pct_base = float(price_change_percent)
  381. pct_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_percent'], errors='coerce')
  382. df_drop_gap = df_drop_nodes_part.loc[
  383. pct_vals.notna(),
  384. ['from_date',
  385. 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  386. 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'relative_position',
  387. 'high_price_cabins', 'start_hours_until_departure',
  388. ]
  389. ].copy()
  390. df_drop_gap['pct_gap'] = (pct_vals.loc[pct_vals.notna()] - pct_base)
  391. df_drop_gap['pct_abs_gap'] = df_drop_gap['pct_gap'].abs()
  392. price_base = pd.to_numeric(price_amount, errors='coerce')
  393. high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce')
  394. df_drop_gap['price_gap'] = high_price_vals - price_base
  395. df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
  396. df_drop_gap = df_drop_gap.sort_values(['price_abs_gap', 'pct_abs_gap'], ascending=[True, True])
  397. same_sign_mask = (
  398. np.sign(pd.to_numeric(df_drop_gap['high_price_change_percent'], errors='coerce'))
  399. == np.sign(pct_base)
  400. )
  401. df_match = df_drop_gap[
  402. (df_drop_gap['pct_abs_gap'] <= pct_threshold)
  403. & (df_drop_gap['price_abs_gap'] <= 10.0)
  404. & same_sign_mask
  405. & (df_drop_gap['high_price_cabins'] == cabins)
  406. & (df_drop_gap['high_price_duration_hours'] <= 48)
  407. ].copy()
  408. # 历史上出现的极近似的增长(下降)幅度后的降价场景
  409. if not df_match.empty:
  410. dur_base = pd.to_numeric(price_duration_hours, errors='coerce')
  411. hud_base = pd.to_numeric(hours_until_departure, errors='coerce')
  412. # dtd_base = pd.to_numeric(days_to_departure, errors='coerce')
  413. if pd.notna(dur_base) and pd.notna(hud_base):
  414. df_match_chk = df_match.copy()
  415. # drop_dtd_vals = pd.to_numeric(df_match_chk['drop_days_to_departure'], errors='coerce')
  416. # df_match_chk = df_match_chk.loc[drop_dtd_vals.notna()].copy()
  417. # df_match_chk = df_match_chk.loc[(drop_dtd_vals.loc[drop_dtd_vals.notna()] - float(dtd_base)).abs() <= 3].copy()
  418. # 正例收紧 (距离起飞的小时数)
  419. drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce')
  420. df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy()
  421. df_match_chk = df_match_chk.loc[(float(hud_base) - drop_hud_vals.loc[drop_hud_vals.notna()]) >= 0].copy()
  422. start_hud_vals = pd.to_numeric(df_match_chk['start_hours_until_departure'], errors='coerce')
  423. df_match_chk = df_match_chk.loc[start_hud_vals.notna()].copy()
  424. df_match_chk = df_match_chk.loc[(float(hud_base) - start_hud_vals.loc[start_hud_vals.notna()]) <= 0].copy()
  425. # 正例收紧 (持续小时数)
  426. dur_num_chk = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce')
  427. dur_delta = dur_num_chk - float(dur_base)
  428. df_match_chk = df_match_chk.assign(dur_delta=dur_delta)
  429. df_match_chk = df_match_chk.loc[df_match_chk['dur_delta'].notna()].copy()
  430. # df_match_chk = df_match_chk.loc[df_match_chk['dur_delta'].abs() <= 72].copy()
  431. # 所有条件都对的上
  432. if not df_match_chk.empty:
  433. length_drop = df_match_chk.shape[0]
  434. df_min_hours.loc[idx, 'drop_price_sample_size'] = length_drop
  435. drop_price_change_upper = df_match_chk['drop_price_change_amount'].max() # 降价上限
  436. drop_price_change_lower = df_match_chk['drop_price_change_amount'].min() # 降价下限
  437. df_min_hours.loc[idx, 'drop_price_change_upper'] = round(drop_price_change_upper, 2)
  438. df_min_hours.loc[idx, 'drop_price_change_lower'] = round(drop_price_change_lower, 2)
  439. # remaining_hours = (
  440. # pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base)
  441. # ).clip(lower=0)
  442. # remaining_hours = remaining_hours.round().astype(int)
  443. # counts = remaining_hours.value_counts().sort_index()
  444. # probs = (counts / counts.sum()).round(4)
  445. # top_hours = int(probs.idxmax())
  446. # top_prob = float(probs.max())
  447. # dist_items = list(zip(probs.index.tolist(), probs.tolist()))
  448. # dist_items = dist_items[:10]
  449. # dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items])
  450. dur_delta_list = df_match_chk['dur_delta'].tolist()
  451. dist_str = "'" + ' '.join([f"{ddl:g}" for ddl in dur_delta_list])
  452. df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  453. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours
  454. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1
  455. df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str
  456. df_min_hours.loc[idx, 'flag_dist'] = 'd0'
  457. pass
  458. pass
  459. # 针对历史上发生的 <升价
  460. if not df_rise_nodes.empty:
  461. # 对准航线 航班号 行李配额
  462. df_rise_nodes_part = df_rise_nodes[
  463. (df_rise_nodes['citypair'] == city_pair) &
  464. (df_rise_nodes['flight_numbers'] == flight_numbers) &
  465. (df_rise_nodes['baggage_weight'] == baggage_weight)
  466. ]
  467. # 升价前 增量阈值、当前阈值 的匹配
  468. if not df_rise_nodes_part.empty and pd.notna(price_change_percent):
  469. pct_base_1 = float(price_change_percent)
  470. pct_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_percent'], errors='coerce')
  471. df_rise_gap_1 = df_rise_nodes_part.loc[
  472. pct_vals_1.notna(),
  473. ['from_date',
  474. 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
  475. 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'relative_position',
  476. 'prev_rise_cabins', 'start_hours_until_departure',
  477. ]
  478. ].copy()
  479. df_rise_gap_1['pct_gap'] = (pct_vals_1.loc[pct_vals_1.notna()] - pct_base_1)
  480. df_rise_gap_1['pct_abs_gap'] = df_rise_gap_1['pct_gap'].abs()
  481. price_base_1 = pd.to_numeric(price_amount, errors='coerce')
  482. rise_price_vals_1 = pd.to_numeric(df_rise_gap_1['prev_rise_amount'], errors='coerce')
  483. df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1
  484. df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs()
  485. df_rise_gap_1 = df_rise_gap_1.sort_values(['price_abs_gap', 'pct_abs_gap'], ascending=[True, True])
  486. same_sign_mask_1 = (
  487. np.sign(pd.to_numeric(df_rise_gap_1['prev_rise_change_percent'], errors='coerce'))
  488. == np.sign(pct_base_1)
  489. )
  490. df_match_1 = df_rise_gap_1.loc[
  491. (df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1)
  492. & (df_rise_gap_1['price_abs_gap'] <= 10.0)
  493. & same_sign_mask_1
  494. & (df_rise_gap_1['prev_rise_cabins'] == cabins)
  495. ].copy()
  496. # 历史上出现的极近似的增长(下降)幅度后的升价场景
  497. if not df_match_1.empty:
  498. dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce')
  499. hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce')
  500. # dtd_base_1 = pd.to_numeric(days_to_departure, errors='coerce')
  501. if pd.notna(dur_base_1) and pd.notna(hud_base_1):
  502. df_match_chk_1 = df_match_1.copy()
  503. # drop_dtd_vals_1 = pd.to_numeric(df_match_chk_1['rise_days_to_departure'], errors='coerce')
  504. # df_match_chk_1 = df_match_chk_1.loc[drop_dtd_vals_1.notna()].copy()
  505. # df_match_chk_1 = df_match_chk_1.loc[(drop_dtd_vals_1.loc[drop_dtd_vals_1.notna()] - float(dtd_base_1)).abs() <= 3].copy()
  506. # 反例收紧 (距离起飞的小时数)
  507. rise_hud_vals_1 = pd.to_numeric(df_match_chk_1['rise_hours_until_departure'], errors='coerce')
  508. df_match_chk_1 = df_match_chk_1.loc[rise_hud_vals_1.notna()].copy()
  509. df_match_chk_1 = df_match_chk_1.loc[(float(hud_base_1) - rise_hud_vals_1.loc[rise_hud_vals_1.notna()]) >= 0].copy()
  510. start_hud_vals_1 = pd.to_numeric(df_match_chk_1['start_hours_until_departure'], errors='coerce')
  511. df_match_chk_1 = df_match_chk_1.loc[start_hud_vals_1.notna()].copy()
  512. df_match_chk_1 = df_match_chk_1.loc[(float(hud_base_1) - start_hud_vals_1.loc[start_hud_vals_1.notna()]) <= 0].copy()
  513. # 反例收紧:48小时内发生降价的不算显著反例
  514. # _rise_pct_chk = pd.to_numeric(df_match_chk_1['rise_price_change_percent'], errors='coerce')
  515. # _prev_dur_chk = pd.to_numeric(df_match_chk_1['prev_rise_duration_hours'], errors='coerce')
  516. # _exclude_mask = _rise_pct_chk.lt(0) & _prev_dur_chk.lt(48)
  517. # df_match_chk_1 = df_match_chk_1.loc[~_exclude_mask.fillna(False)].copy()
  518. # 所有条件都对的上
  519. if not df_match_chk_1.empty:
  520. length_rise = df_match_chk_1.shape[0]
  521. df_min_hours.loc[idx, 'rise_price_sample_size'] = length_rise
  522. rise_price_change_upper = df_match_chk_1['rise_price_change_amount'].max() # 涨价上限
  523. rise_price_change_lower = df_match_chk_1['rise_price_change_amount'].min() # 涨价下限
  524. df_min_hours.loc[idx, 'rise_price_change_upper'] = round(rise_price_change_upper, 2)
  525. df_min_hours.loc[idx, 'rise_price_change_lower'] = round(rise_price_change_lower, 2)
  526. # 可以明确的判定不降价
  527. if length_drop == 0:
  528. df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  529. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  530. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
  531. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r0'
  532. df_min_hours.loc[idx, 'flag_dist'] = 'r0'
  533. # 分歧判定
  534. else:
  535. drop_prob = round(length_drop / (length_rise + length_drop), 2)
  536. # 依旧保持之前的降价判定,概率修改
  537. if drop_prob > 0.5:
  538. df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  539. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1'
  540. df_min_hours.loc[idx, 'flag_dist'] = 'd1'
  541. # 改判不降价,概率修改
  542. else:
  543. df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  544. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r1'
  545. df_min_hours.loc[idx, 'flag_dist'] = 'r1'
  546. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob
  547. print("判定循环结束")
  548. _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h")
  549. df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(360, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
  550. df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(72, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
  551. # 要展示在预测表里的字段
  552. order_cols = [
  553. "citypair", "flight_numbers", "baggage_weight", "from_date", "from_time",
  554. "cabins", "ticket_amount", "currency", "price_base", "price_tax",
  555. "price_total", 'relative_position', 'is_good_target', 'days_to_departure', 'hours_until_departure',
  556. 'price_change_amount', 'price_change_percent', 'price_duration_hours',
  557. "update_hour", "create_time",
  558. 'valid_begin_hour', 'valid_end_hour',
  559. 'simple_will_price_drop', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist',
  560. 'flag_dist',
  561. 'drop_price_change_upper', 'drop_price_change_lower', 'drop_price_sample_size',
  562. 'rise_price_change_upper', 'rise_price_change_lower', 'rise_price_sample_size',
  563. ]
  564. df_predict = df_min_hours[order_cols]
  565. df_predict = df_predict.rename(columns={
  566. 'simple_will_price_drop': 'will_price_drop',
  567. 'simple_drop_in_hours_prob': 'drop_in_hours_prob',
  568. 'simple_drop_in_hours_dist': 'drop_in_hours_dist',
  569. }
  570. )
  571. # 排序
  572. df_predict = df_predict.sort_values(
  573. by=['citypair', 'flight_numbers', 'baggage_weight', 'from_date'],
  574. kind='mergesort',
  575. na_position='last',
  576. ).reset_index(drop=True)
  577. total_cnt = len(df_predict)
  578. if "will_price_drop" in df_predict.columns:
  579. _wpd = pd.to_numeric(df_predict["will_price_drop"], errors="coerce")
  580. drop_1_cnt = int((_wpd == 1).sum())
  581. drop_0_cnt = int((_wpd == 0).sum())
  582. else:
  583. drop_1_cnt = 0
  584. drop_0_cnt = 0
  585. print(f"will_price_drop 分类数量统计: 1(会降)={drop_1_cnt}, 0(不降)={drop_0_cnt}, 总数={total_cnt}, 过滤前总数={total_cnt_before}")
  586. csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv')
  587. df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig')
  588. print("预测结果已追加")
  589. return df_predict