data_process.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import pandas as pd
  2. import numpy as np
  3. import gc
  4. import os
  5. def preprocess_data_simple(df_input, is_train=False, hourly_time=None):
  6. print(">>> 开始数据预处理")
  7. # 城市码映射成数字(不用)
  8. # 更新日期是周几
  9. df_input['update_week'] = df_input['update_hour'].dt.dayofweek + 1
  10. # gid:基于指定字段的分组标记(整数)
  11. df_input['gid'] = (
  12. df_input
  13. .groupby(
  14. ['citypair', 'flight_numbers', 'from_date'], # 'baggage_weight' 先不进分组
  15. sort=False
  16. )
  17. .ngroup()
  18. )
  19. # 在 gid 与 baggage_weight 内按时间降序
  20. df_input = df_input.sort_values(
  21. by=['gid', 'baggage_weight', 'hours_until_departure'],
  22. ascending=[True, True, False]
  23. ).reset_index(drop=True)
  24. df_input = df_input[df_input['hours_until_departure'] <= 480]
  25. df_input = df_input[df_input['baggage_weight'] == 20] # 先保留20公斤行李的
  26. # 在hours_until_departure 的末尾 保留到当前时刻的数据
  27. if not is_train:
  28. df_input = df_input[df_input['update_hour'] <= hourly_time].copy()
  29. else:
  30. df_input = df_input.copy() # 训练集也 copy 一下保持一致性
  31. df_input = df_input.reset_index(drop=True)
  32. # 价格变化最小量阈值
  33. price_change_amount_threshold = 5
  34. df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].diff()
  35. # 计算价格变化量
  36. df_input['price_change_amount'] = (
  37. df_input['_raw_price_diff']
  38. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  39. .replace(0, np.nan)
  40. .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
  41. .ffill()
  42. .fillna(0)
  43. .round(2)
  44. )
  45. # 计算价格变化百分比(相对于上一时间点的变化率)
  46. df_input['price_change_percent'] = (
  47. df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total']
  48. .pct_change()
  49. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  50. .replace(0, np.nan)
  51. .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
  52. .ffill()
  53. .fillna(0)
  54. .round(4)
  55. )
  56. # 第一步:标记价格变化段
  57. df_input['price_change_segment'] = (
  58. df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount']
  59. .apply(lambda s: (s != s.shift()).cumsum())
  60. )
  61. # 第二步:计算每个变化段内的持续时间
  62. df_input['price_duration_hours'] = (
  63. df_input.groupby(['gid', 'baggage_weight', 'price_change_segment'], group_keys=False)
  64. .cumcount()
  65. .add(1)
  66. )
  67. # 可选:删除临时列
  68. df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff'])
  69. # 训练过程
  70. if is_train:
  71. df_target = df_input[(df_input['hours_until_departure'] >= 24) & (df_input['hours_until_departure'] <= 360)].copy()
  72. df_target = df_target.sort_values(
  73. by=['gid', 'baggage_weight', 'hours_until_departure'],
  74. ascending=[True, True, False]
  75. ).reset_index(drop=True)
  76. # 每条对应的前一条记录
  77. prev_pct = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_percent'].shift(1)
  78. prev_amo = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'].shift(1)
  79. prev_dur = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_duration_hours'].shift(1)
  80. prev_price = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].shift(1)
  81. # 对于先升后降(先降再降)的分析
  82. seg_start_mask = df_target['price_duration_hours'].eq(1) # 开始变价节点
  83. drop_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] < 0)
  84. df_drop_nodes = df_target.loc[drop_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy()
  85. df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
  86. df_drop_nodes.rename(columns={'days_to_departure': 'drop_days_to_departure'}, inplace=True)
  87. df_drop_nodes.rename(columns={'update_hour': 'drop_update_hour'}, inplace=True)
  88. df_drop_nodes.rename(columns={'update_week': 'drop_update_week'}, inplace=True)
  89. df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  90. df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  91. df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
  92. df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
  93. df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
  94. df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy()
  95. df_drop_nodes = df_drop_nodes.reset_index(drop=True)
  96. flight_info_cols = [
  97. 'citypair', 'flight_numbers', 'from_time', 'from_date', 'currency',
  98. ]
  99. flight_info_cols = [c for c in flight_info_cols if c in df_target.columns]
  100. df_gid_info = df_target[['gid', 'baggage_weight'] + flight_info_cols].drop_duplicates(subset=['gid', 'baggage_weight']).reset_index(drop=True)
  101. df_drop_nodes = df_drop_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
  102. drop_info_cols = [
  103. 'drop_update_hour', 'drop_update_week',
  104. 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  105. 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount',
  106. ]
  107. # 按顺序排列 去掉gid
  108. df_drop_nodes = df_drop_nodes[flight_info_cols + ['baggage_weight'] + drop_info_cols]
  109. # 对于先升再升(先降再升)的分析
  110. # seg_start_mask = df_target['price_duration_hours'].eq(1)
  111. rise_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] > 0)
  112. df_rise_nodes = df_target.loc[rise_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy()
  113. df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True)
  114. df_rise_nodes.rename(columns={'days_to_departure': 'rise_days_to_departure'}, inplace=True)
  115. df_rise_nodes.rename(columns={'update_hour': 'rise_update_hour'}, inplace=True)
  116. df_rise_nodes.rename(columns={'update_week': 'rise_update_week'}, inplace=True)
  117. df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  118. df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  119. df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy()
  120. df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy()
  121. df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy()
  122. df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy()
  123. df_rise_nodes = df_rise_nodes.reset_index(drop=True)
  124. df_rise_nodes = df_rise_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
  125. rise_info_cols = [
  126. 'rise_update_hour', 'rise_update_week',
  127. 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
  128. 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount',
  129. ]
  130. df_rise_nodes = df_rise_nodes[flight_info_cols + ['baggage_weight'] + rise_info_cols]
  131. # 制作历史包络线
  132. envelope_group = ['citypair', 'flight_numbers', 'from_date', 'baggage_weight']
  133. idx_peak = df_input.groupby(envelope_group)['price_total'].idxmax()
  134. df_envelope = df_input.loc[idx_peak, envelope_group + [
  135. 'from_time', 'price_total', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week',
  136. ]].rename(columns={
  137. 'price_total': 'peak_price',
  138. 'hours_until_departure': 'peak_hours',
  139. 'days_to_departure': 'peak_days',
  140. 'update_hour': 'peak_time',
  141. 'update_week': 'peak_week',
  142. }).reset_index(drop=True)
  143. del df_gid_info
  144. del df_target
  145. return df_input, df_drop_nodes, df_rise_nodes, df_envelope
  146. return df_input, None, None, None
  147. def predict_data_simple(df_input, city_pair, output_dir, predict_dir=".", pred_time_str=""):
  148. if df_input is None or df_input.empty:
  149. return pd.DataFrame()
  150. df_sorted = df_input.sort_values(
  151. by=['gid', 'baggage_weight', 'hours_until_departure'],
  152. ascending=[True, True, False],
  153. ).reset_index(drop=True)
  154. df_sorted = df_sorted[
  155. df_sorted['hours_until_departure'].between(24, 360)
  156. ].reset_index(drop=True)
  157. # 每个 gid baggage_weight 取 hours_until_departure 最小的一条 (当前小时)
  158. df_min_hours = (
  159. df_sorted.drop_duplicates(subset=['gid', 'baggage_weight'], keep='last')
  160. .reset_index(drop=True)
  161. )
  162. # 读历史降价场景
  163. drop_info_csv_path = os.path.join(output_dir, f'{city_pair}_drop_info.csv')
  164. if os.path.exists(drop_info_csv_path):
  165. df_drop_nodes = pd.read_csv(drop_info_csv_path)
  166. else:
  167. df_drop_nodes = pd.DataFrame()
  168. # 读历史升价场景
  169. rise_info_csv_path = os.path.join(output_dir, f'{city_pair}_rise_info.csv')
  170. if os.path.exists(rise_info_csv_path):
  171. df_rise_nodes = pd.read_csv(rise_info_csv_path)
  172. else:
  173. df_rise_nodes = pd.DataFrame()
  174. # 联合价格分布
  175. # 统一初始化
  176. df_min_hours['relative_position'] = np.nan
  177. if not df_drop_nodes.empty:
  178. df_drop_nodes['relative_position'] = np.nan
  179. if not df_rise_nodes.empty:
  180. df_rise_nodes['relative_position'] = np.nan
  181. parts = []
  182. # 当前待预测
  183. if not df_min_hours.empty and 'price_total' in df_min_hours.columns:
  184. cur = df_min_hours[['price_total']].copy()
  185. cur['price'] = pd.to_numeric(cur['price_total'], errors='coerce')
  186. cur['source'] = 'min'
  187. cur['row_id'] = cur.index
  188. parts.append(cur[['price', 'source', 'row_id']])
  189. # 历史降价
  190. if not df_drop_nodes.empty and 'high_price_amount' in df_drop_nodes.columns:
  191. drop = df_drop_nodes[['high_price_amount']].copy()
  192. drop['price'] = pd.to_numeric(drop['high_price_amount'], errors='coerce')
  193. drop['source'] = 'drop'
  194. drop['row_id'] = drop.index
  195. parts.append(drop[['price', 'source', 'row_id']])
  196. # 历史升价
  197. if not df_rise_nodes.empty and 'prev_rise_amount' in df_rise_nodes.columns:
  198. rise = df_rise_nodes[['prev_rise_amount']].copy()
  199. rise['price'] = pd.to_numeric(rise['prev_rise_amount'], errors='coerce')
  200. rise['source'] = 'rise'
  201. rise['row_id'] = rise.index
  202. parts.append(rise[['price', 'source', 'row_id']])
  203. if parts:
  204. all_prices = pd.concat(parts, ignore_index=True)
  205. all_prices = all_prices.dropna(subset=['price']).reset_index(drop=True)
  206. # 计算价格百分位
  207. dense_rank = all_prices['price'].rank(method='dense')
  208. max_rank = dense_rank.max()
  209. if pd.notna(max_rank) and max_rank > 1:
  210. all_prices['relative_position'] = (dense_rank - 1) / (max_rank - 1)
  211. else:
  212. all_prices['relative_position'] = 1.0
  213. all_prices['relative_position'] = all_prices['relative_position'].round(4)
  214. # 回填到三个表
  215. m = all_prices['source'] == 'min'
  216. df_min_hours.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  217. if not df_drop_nodes.empty:
  218. m = all_prices['source'] == 'drop'
  219. df_drop_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  220. if not df_rise_nodes.empty:
  221. m = all_prices['source'] == 'rise'
  222. df_rise_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
  223. pass