data_preprocess.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931
  1. import pandas as pd
  2. import numpy as np
  3. import bisect
  4. import gc
  5. from datetime import datetime, timedelta
  6. from sklearn.preprocessing import StandardScaler
  7. from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays
  8. from utils import insert_df_col
  9. COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
  10. def preprocess_data_cycle(df_input, interval_hours=8, feature_length=240, target_length=24, is_training=True):
  11. # df_input_part = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['hours_until_departure'] < current_n_hours)].copy()
  12. df_input = preprocess_data_first_half(df_input)
  13. # 创建一个空列表来存储所有处理后的数据部分
  14. list_df_parts = []
  15. crop_lower_limit_list = [4] # [4, 28, 52, 76, 100]
  16. for crop_lower_limit in crop_lower_limit_list:
  17. target_n_hours = crop_lower_limit + target_length
  18. feature_n_hours = target_n_hours + interval_hours
  19. crop_upper_limit = feature_n_hours + feature_length
  20. df_input_part = preprocess_data(df_input, is_training=is_training, crop_upper_limit=crop_upper_limit, feature_n_hours=feature_n_hours,
  21. target_n_hours=target_n_hours, crop_lower_limit=crop_lower_limit)
  22. # 将处理后的部分添加到列表中
  23. list_df_parts.append(df_input_part)
  24. if not is_training:
  25. break
  26. # 合并所有处理后的数据部分
  27. if list_df_parts:
  28. df_combined = pd.concat(list_df_parts, ignore_index=True)
  29. return df_combined
  30. else:
  31. return pd.DataFrame() # 如果没有数据,返回空DataFrame
  32. def preprocess_data_first_half(df_input):
  33. '''前半部分'''
  34. print(">>> 开始数据预处理")
  35. # 生成 城市对
  36. df_input['city_pair'] = (
  37. df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
  38. )
  39. # 城市码映射成数字
  40. df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
  41. df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
  42. missing_from = (
  43. df_input.loc[df_input['from_city_num'].isna(), 'from_city_code']
  44. .unique()
  45. )
  46. missing_to = (
  47. df_input.loc[df_input['to_city_num'].isna(), 'to_city_code']
  48. .unique()
  49. )
  50. if missing_from:
  51. print("未映射的 from_city:", missing_from)
  52. if missing_to:
  53. print("未映射的 to_city:", missing_to)
  54. # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
  55. cols = df_input.columns.tolist()
  56. # 删除已存在的几列(保证顺序正确)
  57. for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
  58. cols.remove(c)
  59. # 这几列插入到最前面
  60. df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
  61. pass
  62. # 转格式
  63. df_input['search_dep_time'] = pd.to_datetime(
  64. df_input['search_dep_time'],
  65. format='%Y%m%d',
  66. errors='coerce'
  67. ).dt.strftime('%Y-%m-%d')
  68. # 重命名起飞日期
  69. df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
  70. # 重命名航班号
  71. df_input.rename(
  72. columns={
  73. 'seg1_flight_number': 'flight_number_1',
  74. 'seg2_flight_number': 'flight_number_2'
  75. },
  76. inplace=True
  77. )
  78. # 分开填充
  79. df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
  80. df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
  81. # 航班号转数字
  82. df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map)
  83. df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map)
  84. missing_flight_1 = (
  85. df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1']
  86. .unique()
  87. )
  88. missing_flight_2 = (
  89. df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2']
  90. .unique()
  91. )
  92. if missing_flight_1:
  93. print("未映射的 flight_1:", missing_flight_1)
  94. if missing_flight_2:
  95. print("未映射的 flight_2:", missing_flight_2)
  96. # flight_1_num 放在 seg1_dep_air_port 之前
  97. insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port')
  98. # flight_2_num 放在 seg2_dep_air_port 之前
  99. insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port')
  100. df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0
  101. # baggage_level 放在 flight_number_2 之前
  102. insert_df_col(df_input, 'baggage_level', 'flight_number_2')
  103. df_input['Adult_Total_Price'] = df_input['adult_total_price']
  104. # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值
  105. insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining')
  106. df_input['Hours_Until_Departure'] = df_input['hours_until_departure']
  107. # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值
  108. insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure')
  109. pass
  110. # gid:基于指定字段的分组标记(整数)
  111. df_input['gid'] = (
  112. df_input
  113. .groupby(
  114. ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
  115. sort=False
  116. )
  117. .ngroup()
  118. )
  119. return df_input
  120. def preprocess_data(df_input, is_training=True, crop_upper_limit=480, feature_n_hours=36, target_n_hours=28, crop_lower_limit=4):
  121. print(f"裁剪范围: [{crop_lower_limit}, {crop_upper_limit}], 间隔窗口: [{target_n_hours}, {feature_n_hours}]")
  122. # 做一下时间段裁剪, 保留起飞前480小时之内且大于等于4小时的
  123. df_input = df_input[(df_input['hours_until_departure'] < crop_upper_limit) &
  124. (df_input['hours_until_departure'] >= crop_lower_limit)].reset_index(drop=True)
  125. # 在 gid 与 baggage 内按时间降序
  126. df_input = df_input.sort_values(
  127. by=['gid', 'baggage', 'hours_until_departure'],
  128. ascending=[True, True, False]
  129. ).reset_index(drop=True)
  130. # 价格幅度阈值
  131. VALID_DROP_MIN = 5
  132. # 价格变化掩码
  133. g = df_input.groupby(['gid', 'baggage'])
  134. diff = g['adult_total_price'].transform('diff')
  135. # change_mask = diff.abs() >= VALID_DROP_MIN # 变化太小的不计入
  136. decrease_mask = diff <= -VALID_DROP_MIN # 降价(变化太小的不计入)
  137. increase_mask = diff >= VALID_DROP_MIN # 升价(变化太小的不计入)
  138. df_input['_price_event_dir'] = np.where(increase_mask, 1, np.where(decrease_mask, -1, 0))
  139. # 计算连续升价/降价次数
  140. def _calc_price_streaks(df_group):
  141. dirs = df_group['_price_event_dir'].to_numpy()
  142. n = len(dirs)
  143. inc = np.full(n, np.nan)
  144. dec = np.full(n, np.nan)
  145. last_dir = 0
  146. inc_cnt = 0
  147. dec_cnt = 0
  148. for i, d in enumerate(dirs):
  149. if d == 1:
  150. inc_cnt = inc_cnt + 1 if last_dir == 1 else 1
  151. dec_cnt = 0
  152. last_dir = 1
  153. inc[i] = inc_cnt
  154. dec[i] = dec_cnt
  155. elif d == -1:
  156. dec_cnt = dec_cnt + 1 if last_dir == -1 else 1
  157. inc_cnt = 0
  158. last_dir = -1
  159. inc[i] = inc_cnt
  160. dec[i] = dec_cnt
  161. inc_s = pd.Series(inc, index=df_group.index).ffill().fillna(0).astype(int)
  162. dec_s = pd.Series(dec, index=df_group.index).ffill().fillna(0).astype(int)
  163. return pd.DataFrame(
  164. {
  165. 'price_increase_times_consecutive': inc_s,
  166. 'price_decrease_times_consecutive': dec_s,
  167. },
  168. index=df_group.index,
  169. )
  170. streak_df = df_input.groupby(['gid', 'baggage'], sort=False, group_keys=False).apply(_calc_price_streaks)
  171. df_input = df_input.join(streak_df)
  172. df_input.drop(columns=['_price_event_dir'], inplace=True)
  173. # 价格变化次数
  174. # df_input['price_change_times_total'] = (
  175. # change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  176. # )
  177. # 价格下降次数
  178. df_input['price_decrease_times_total'] = (
  179. decrease_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  180. )
  181. # 价格上升次数
  182. df_input['price_increase_times_total'] = (
  183. increase_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  184. )
  185. # 上次发生变价的小时数
  186. # last_change_hour = (
  187. # df_input['hours_until_departure']
  188. # .where(change_mask)
  189. # .groupby([df_input['gid'], df_input['baggage']])
  190. # .ffill() # 前向填充
  191. # )
  192. # 上次发生降价的小时数
  193. last_decrease_hour = (
  194. df_input['hours_until_departure']
  195. .where(decrease_mask)
  196. .groupby([df_input['gid'], df_input['baggage']])
  197. .ffill() # 前向填充
  198. )
  199. # 上次发生升价的小时数
  200. last_increase_hour = (
  201. df_input['hours_until_departure']
  202. .where(increase_mask)
  203. .groupby([df_input['gid'], df_input['baggage']])
  204. .ffill() # 前向填充
  205. )
  206. # 当前距离上一次变价过去多少小时
  207. # df_input['price_last_change_hours'] = (
  208. # last_change_hour - df_input['hours_until_departure']
  209. # ).fillna(0)
  210. # 当前距离上一次降价过去多少小时
  211. df_input['price_last_decrease_hours'] = (
  212. last_decrease_hour - df_input['hours_until_departure']
  213. ).fillna(0)
  214. # 当前距离上一次升价过去多少小时
  215. df_input['price_last_increase_hours'] = (
  216. last_increase_hour - df_input['hours_until_departure']
  217. ).fillna(0)
  218. pass
  219. # 想插入到 seats_remaining 前面的新列
  220. new_cols = [
  221. # 'price_change_times_total',
  222. # 'price_last_change_hours',
  223. 'price_decrease_times_total',
  224. 'price_decrease_times_consecutive',
  225. 'price_last_decrease_hours',
  226. 'price_increase_times_total',
  227. 'price_increase_times_consecutive',
  228. 'price_last_increase_hours',
  229. ]
  230. # 当前所有列
  231. cols = df_input.columns.tolist()
  232. # 找到 seats_remaining 的位置
  233. idx = cols.index('seats_remaining')
  234. # 重新拼列顺序
  235. new_order = cols[:idx] + new_cols + cols[idx:]
  236. # 去重(防止列已经在原位置)
  237. new_order = list(dict.fromkeys(new_order))
  238. # 重新排列 DataFrame
  239. df_input = df_input[new_order]
  240. pass
  241. print(">>> 计算价格区间特征")
  242. # 1. 基于绝对价格水平的价格区间划分
  243. # 先计算每个(gid, baggage)的价格统计特征
  244. # g = df_input.groupby(['gid', 'baggage'])
  245. price_stats = df_input.groupby(['gid', 'baggage'])['adult_total_price'].agg(
  246. min_price='min',
  247. max_price='max',
  248. mean_price='mean',
  249. std_price='std'
  250. ).reset_index()
  251. # 合并统计特征到原数据
  252. df_input = df_input.merge(price_stats, on=['gid', 'baggage'], how='left')
  253. # 2. 基于绝对价格的价格区间划分 (可以删除,因为后面有更精细的基于频率加权的分类)
  254. # # 高价区间:超过均值+1倍标准差
  255. # df_input['price_absolute_high'] = (df_input['adult_total_price'] >
  256. # (df_input['mean_price'] + df_input['std_price'])).astype(int)
  257. # # 中高价区间:均值到均值+1倍标准差
  258. # df_input['price_absolute_mid_high'] = ((df_input['adult_total_price'] > df_input['mean_price']) &
  259. # (df_input['adult_total_price'] <= (df_input['mean_price'] + df_input['std_price']))).astype(int)
  260. # # 中低价区间:均值-1倍标准差到均值
  261. # df_input['price_absolute_mid_low'] = ((df_input['adult_total_price'] > (df_input['mean_price'] - df_input['std_price'])) &
  262. # (df_input['adult_total_price'] <= df_input['mean_price'])).astype(int)
  263. # # 低价区间:低于均值-1倍标准差
  264. # df_input['price_absolute_low'] = (df_input['adult_total_price'] <= (df_input['mean_price'] - df_input['std_price'])).astype(int)
  265. # 3. 基于频率加权的价格百分位数(改进版)
  266. # 计算每个价格出现的频率
  267. price_freq = df_input.groupby(['gid', 'baggage', 'adult_total_price']).size().reset_index(name='price_frequency')
  268. df_input = df_input.merge(price_freq, on=['gid', 'baggage', 'adult_total_price'], how='left')
  269. # 计算频率加权的百分位数
  270. def weighted_percentile(group):
  271. if len(group) == 0:
  272. return pd.Series([np.nan] * 4, index=['price_weighted_percentile_25',
  273. 'price_weighted_percentile_50',
  274. 'price_weighted_percentile_75',
  275. 'price_weighted_percentile_90'])
  276. # 按价格排序,计算累积频率
  277. group = group.sort_values('adult_total_price')
  278. group['cum_freq'] = group['price_frequency'].cumsum()
  279. total_freq = group['price_frequency'].sum()
  280. # 计算加权百分位数
  281. percentiles = []
  282. for p in [0.25, 0.5, 0.75, 0.9]:
  283. threshold = total_freq * p
  284. # 找到第一个累积频率超过阈值的价格
  285. mask = group['cum_freq'] >= threshold
  286. if mask.any():
  287. percentile_value = group.loc[mask.idxmax(), 'adult_total_price']
  288. else:
  289. percentile_value = group['adult_total_price'].max()
  290. percentiles.append(percentile_value)
  291. return pd.Series(percentiles, index=['price_weighted_percentile_25',
  292. 'price_weighted_percentile_50',
  293. 'price_weighted_percentile_75',
  294. 'price_weighted_percentile_90'])
  295. # 按gid和baggage分组计算加权百分位数
  296. weighted_percentiles = df_input.groupby(['gid', 'baggage']).apply(weighted_percentile).reset_index()
  297. df_input = df_input.merge(weighted_percentiles, on=['gid', 'baggage'], how='left')
  298. # 4. 结合绝对价格和频率的综合判断(改进版)
  299. freq_median = df_input.groupby(['gid', 'baggage'])['price_frequency'].transform('median')
  300. # 计算价格相对于90%百分位数的倍数,用于区分不同级别的高价
  301. df_input['price_relative_to_90p'] = df_input['adult_total_price'] / df_input['price_weighted_percentile_90']
  302. # 添加价格容忍度:避免相近价格被分到不同区间
  303. # 计算价格差异容忍度(使用各百分位数的1%作为容忍度阈值)
  304. # tolerance_90p = df_input['price_weighted_percentile_90'] * 0.01
  305. tolerance_75p = df_input['price_weighted_percentile_75'] * 0.01
  306. tolerance_50p = df_input['price_weighted_percentile_50'] * 0.01
  307. tolerance_25p = df_input['price_weighted_percentile_25'] * 0.01
  308. # 重新设计价格区间分类(确保无重叠):
  309. # 首先定义各个区间的mask
  310. # 4.1 异常高价:价格远高于90%百分位数(超过1.5倍)且频率极低(低于中位数的1/3)
  311. price_abnormal_high_mask = ((df_input['price_relative_to_90p'] > 1.5) &
  312. (df_input['price_frequency'] < freq_median * 0.33))
  313. # 4.2 真正高位:严格满足条件(价格 > 90%分位数 且 频率 < 中位数)
  314. price_real_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_90']) &
  315. (df_input['price_frequency'] < freq_median) &
  316. ~price_abnormal_high_mask)
  317. # 4.3 正常高位:使用容忍度(价格接近75%分位数)
  318. price_normal_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_75'] - tolerance_75p) &
  319. ~price_real_high_mask & ~price_abnormal_high_mask)
  320. # 4.4 中高价:使用容忍度(价格在50%-75%分位数之间)
  321. price_mid_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_50'] - tolerance_50p) &
  322. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_75'] + tolerance_75p) &
  323. ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  324. # 4.5 中低价:使用容忍度(价格在25%-50%分位数之间)
  325. price_mid_low_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_25'] - tolerance_25p) &
  326. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_50'] + tolerance_50p) &
  327. ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  328. # 4.6 低价:严格满足条件(价格 ≤ 25%分位数)
  329. price_low_mask = ((df_input['adult_total_price'] <= df_input['price_weighted_percentile_25']) &
  330. ~price_mid_low_mask & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  331. # 使用np.select确保互斥性
  332. price_zone_masks = [
  333. price_abnormal_high_mask, # 异常高价区(5级)
  334. price_real_high_mask, # 真正高价区(4级)
  335. price_normal_high_mask, # 正常高价区(3级)
  336. price_mid_high_mask, # 中高价区(2级)
  337. price_mid_low_mask, # 中低价区(1级)
  338. price_low_mask, # 低价区(0级)
  339. ]
  340. price_zone_values = [5, 4, 3, 2, 1, 0] # 5:异常高价, 4:真正高价, 3:正常高价, 2:中高价, 1:中低价, 0:低价
  341. # 使用np.select确保每个价格只被分到一个区间
  342. price_zone_result = np.select(price_zone_masks, price_zone_values, default=2) # 默认中高价
  343. # 4.8 价格区间综合标记
  344. df_input['price_zone_comprehensive'] = price_zone_result
  345. # 5. 价格异常度检测
  346. # 价格相对于均值的标准化偏差
  347. df_input['price_z_score'] = (df_input['adult_total_price'] - df_input['mean_price']) / df_input['std_price']
  348. # 价格异常度:基于Z-score的绝对值
  349. df_input['price_anomaly_score'] = np.abs(df_input['price_z_score'])
  350. # 6. 价格稳定性特征
  351. # 计算价格波动系数(标准差/均值)
  352. df_input['price_coefficient_variation'] = df_input['std_price'] / df_input['mean_price']
  353. # 7. 价格趋势特征
  354. # 计算当前价格相对于历史价格的位置
  355. df_input['price_relative_position'] = (df_input['adult_total_price'] - df_input['min_price']) / (df_input['max_price'] - df_input['min_price'])
  356. df_input['price_relative_position'] = df_input['price_relative_position'].fillna(0.5) # 兜底
  357. # 删除中间计算列
  358. df_input.drop(columns=['price_frequency', 'price_z_score', 'price_relative_to_90p'], inplace=True, errors='ignore')
  359. del price_freq
  360. del price_stats
  361. del weighted_percentiles
  362. del freq_median
  363. print(">>> 改进版价格区间特征计算完成")
  364. # 生成第一机场对
  365. df_input['airport_pair_1'] = (
  366. df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
  367. )
  368. # 删除原始第一机场码
  369. df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
  370. # 第一机场对 放到 seg1_dep_time 列的前面
  371. insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time')
  372. # 生成第二机场对(带缺失兜底)
  373. df_input['airport_pair_2'] = np.where(
  374. df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
  375. 'NA',
  376. df_input['seg2_dep_air_port'].astype(str) + "-" +
  377. df_input['seg2_arr_air_port'].astype(str)
  378. )
  379. # 删除原始第二机场码
  380. df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
  381. # 第二机场对 放到 seg2_dep_time 列的前面
  382. insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time')
  383. # 是否转乘
  384. df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
  385. # 是否转乘 放到 flight_number_2 列的前面
  386. insert_df_col(df_input, 'is_transfer', 'flight_number_2')
  387. # 重命名起飞时刻与到达时刻
  388. df_input.rename(
  389. columns={
  390. 'seg1_dep_time': 'dep_time_1',
  391. 'seg1_arr_time': 'arr_time_1',
  392. 'seg2_dep_time': 'dep_time_2',
  393. 'seg2_arr_time': 'arr_time_2',
  394. },
  395. inplace=True
  396. )
  397. # 第一段飞行时长
  398. df_input['fly_duration_1'] = (
  399. (df_input['arr_time_1'] - df_input['dep_time_1'])
  400. .dt.total_seconds() / 3600
  401. ).round(2)
  402. # 第二段飞行时长(无转乘为 0)
  403. df_input['fly_duration_2'] = (
  404. (df_input['arr_time_2'] - df_input['dep_time_2'])
  405. .dt.total_seconds() / 3600
  406. ).fillna(0).round(2)
  407. # 总飞行时长
  408. df_input['fly_duration'] = (
  409. df_input['fly_duration_1'] + df_input['fly_duration_2']
  410. ).round(2)
  411. # 中转停留时长(无转乘为 0)
  412. df_input['stop_duration'] = (
  413. (df_input['dep_time_2'] - df_input['arr_time_1'])
  414. .dt.total_seconds() / 3600
  415. ).fillna(0).round(2)
  416. # 裁剪,防止负数
  417. # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
  418. # df_input[c] = df_input[c].clip(lower=0)
  419. # 和 is_transfer 逻辑保持一致
  420. # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
  421. # 一次性插到 is_filled 前面
  422. insert_before = 'is_filled'
  423. new_cols = [
  424. 'fly_duration_1',
  425. 'fly_duration_2',
  426. 'fly_duration',
  427. 'stop_duration'
  428. ]
  429. cols = df_input.columns.tolist()
  430. idx = cols.index(insert_before)
  431. # 删除旧位置
  432. cols = [c for c in cols if c not in new_cols]
  433. # 插入新位置(顺序保持)
  434. cols[idx:idx] = new_cols # python独有空切片插入法
  435. df_input = df_input[cols]
  436. # 一次生成多个字段
  437. dep_t1 = df_input['dep_time_1']
  438. # 几点起飞(0–23)
  439. df_input['flight_by_hour'] = dep_t1.dt.hour
  440. # 起飞日期几号(1–31)
  441. df_input['flight_by_day'] = dep_t1.dt.day
  442. # 起飞日期几月(1–12)
  443. df_input['flight_day_of_month'] = dep_t1.dt.month
  444. # 起飞日期周几(0=周一, 6=周日)
  445. df_input['flight_day_of_week'] = dep_t1.dt.weekday
  446. # 起飞日期季度(1–4)
  447. df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
  448. # 是否周末(周六 / 周日)
  449. df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
  450. # 找到对应的国家码
  451. df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
  452. df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
  453. # 整体出发时间 就是 dep_time_1
  454. df_input['global_dep_time'] = df_input['dep_time_1']
  455. # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
  456. df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
  457. # 出发日期在出发国家是否节假日
  458. df_input['dep_country_is_holiday'] = df_input.apply(
  459. lambda r: r['global_dep_time'].date()
  460. in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
  461. axis=1
  462. ).astype(int)
  463. # 到达日期在到达国家是否节假日
  464. df_input['arr_country_is_holiday'] = df_input.apply(
  465. lambda r: r['global_arr_time'].date()
  466. in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
  467. axis=1
  468. ).astype(int)
  469. # 在任一侧是否节假日
  470. df_input['any_country_is_holiday'] = (
  471. df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
  472. .max(axis=1)
  473. )
  474. # 是否跨国航线
  475. df_input['is_cross_country'] = (
  476. df_input['dep_country'] != df_input['arr_country']
  477. ).astype(int)
  478. def days_to_next_holiday(country, cur_date):
  479. if pd.isna(country) or pd.isna(cur_date):
  480. return np.nan
  481. holidays = COUNTRY_HOLIDAYS.get(country)
  482. if not holidays:
  483. return np.nan
  484. # 找未来(含当天)的节假日,并排序
  485. future_holidays = sorted([d for d in holidays if d >= cur_date])
  486. if not future_holidays:
  487. return np.nan
  488. next_holiday = future_holidays[0] # 第一个未来节假日
  489. delta_days = (next_holiday - cur_date).days
  490. return delta_days
  491. df_input['days_to_holiday'] = df_input.apply(
  492. lambda r: days_to_next_holiday(
  493. r['dep_country'],
  494. r['update_hour'].date()
  495. ),
  496. axis=1
  497. )
  498. # 没有未来节假日的统一兜底
  499. # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
  500. # days_to_holiday 插在 update_hour 前面
  501. insert_df_col(df_input, 'days_to_holiday', 'update_hour')
  502. # 训练模式
  503. if is_training:
  504. print(">>> 训练模式:计算 target 相关列")
  505. print(f"\n>>> 开始处理 对应区间: n_hours = {target_n_hours}")
  506. target_lower_limit = crop_lower_limit
  507. target_upper_limit = target_n_hours
  508. mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
  509. df_targets = df_input.loc[mask_targets].copy()
  510. targets_amout = df_targets.shape[0]
  511. print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
  512. if targets_amout == 0:
  513. print(f">>> n_hours = {target_n_hours} 无有效数据,跳过")
  514. return pd.DataFrame()
  515. print(">>> 计算 price_at_n_hours")
  516. df_input_object = df_input[(df_input['hours_until_departure'] >= feature_n_hours) & (df_input['baggage'] == 30)].copy()
  517. df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前36\32\30小时
  518. # 提取并重命名 price 列
  519. df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
  520. print(">>> price_at_n_hours计算完成,示例:")
  521. print(df_last_price_at_n_hours.head(5))
  522. # 新的计算降价方式
  523. # 先排序
  524. df_targets = df_targets.sort_values(
  525. ['gid', 'hours_until_departure'],
  526. ascending=[True, False]
  527. )
  528. # 在 gid 内计算价格变化
  529. g = df_targets.groupby('gid', group_keys=False)
  530. df_targets['price_diff'] = g['adult_total_price'].diff()
  531. # VALID_DROP_MIN = 5
  532. # LOWER_HOUR = 4
  533. # UPPER_HOUR = 28
  534. valid_drop_mask = (
  535. (df_targets['price_diff'] <= -VALID_DROP_MIN)
  536. # (df_targets['hours_until_departure'] >= LOWER_HOUR) &
  537. # (df_targets['hours_until_departure'] <= UPPER_HOUR)
  538. )
  539. # 有效的降价
  540. df_valid_drops = df_targets.loc[valid_drop_mask]
  541. # 找「第一次」降价(每个 gid)
  542. df_first_price_drop = (
  543. df_valid_drops
  544. .groupby('gid', as_index=False)
  545. .first()
  546. )
  547. # 简化列
  548. df_first_price_drop = df_first_price_drop[
  549. ['gid', 'hours_until_departure', 'adult_total_price', 'price_diff']
  550. ].rename(columns={
  551. 'hours_until_departure': 'time_to_price_drop',
  552. 'adult_total_price': 'price_at_d_hours',
  553. 'price_diff': 'amount_of_price_drop',
  554. })
  555. # 把降价幅度转成正数(更直观)
  556. df_first_price_drop['amount_of_price_drop'] = (-df_first_price_drop['amount_of_price_drop']).round(2)
  557. pass
  558. # # 计算降价信息
  559. # print(">>> 计算降价信息")
  560. # df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
  561. # df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
  562. # df_targets['price_dropped'] = (
  563. # (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
  564. # (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
  565. # )
  566. # df_price_drops = df_targets[df_targets['price_dropped']].copy()
  567. # price_drops_len = df_price_drops.shape[0]
  568. # if price_drops_len == 0:
  569. # print(f">>> n_hours = {current_n_hours} 无降价信息")
  570. # # 创建包含指定列的空 DataFrame
  571. # df_price_drop_info = pd.DataFrame({
  572. # 'gid': pd.Series(dtype='int64'),
  573. # 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
  574. # 'price_at_first_drop_hours': pd.Series(dtype='float64')
  575. # })
  576. # else:
  577. # df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
  578. # df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
  579. # 'hours_until_departure': 'first_drop_hours_until_departure',
  580. # 'adult_total_price': 'price_at_first_drop_hours'
  581. # })
  582. # print(">>> 降价信息计算完成,示例:")
  583. # print(df_price_drop_info.head(5))
  584. # # 合并信息
  585. # df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
  586. # df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
  587. # df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
  588. # df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
  589. # df_gid_info['time_to_price_drop'] = current_n_hours - df_gid_info['first_drop_hours_until_departure']
  590. # df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
  591. # del df_input_object
  592. # del df_last
  593. # del df_last_price_at_n_hours
  594. # del df_price_drops
  595. # del df_price_drop_info
  596. df_gid_info = df_last_price_at_n_hours.merge(df_first_price_drop, on='gid', how='left')
  597. df_gid_info['will_price_drop'] = df_gid_info['time_to_price_drop'].notnull().astype(int)
  598. df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0)
  599. df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0)
  600. pass
  601. del df_input_object
  602. del df_last
  603. del df_last_price_at_n_hours
  604. del df_first_price_drop
  605. del df_valid_drops
  606. del df_targets
  607. gc.collect()
  608. # 将目标变量合并到输入数据中
  609. print(">>> 将目标变量信息合并到 df_input")
  610. df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
  611. # 使用 0 填充 NaN 值
  612. df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
  613. ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
  614. df_input = df_input.rename(columns={
  615. 'will_price_drop': 'target_will_price_drop',
  616. 'amount_of_price_drop': 'target_amount_of_drop',
  617. 'time_to_price_drop': 'target_time_to_drop'
  618. })
  619. # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
  620. # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
  621. # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
  622. # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
  623. # gid_count = df_min_price_by_gid.shape[0]
  624. # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
  625. # # 将最小价格 merge 到 df_inputs 中
  626. # print(">>> 将最小价格 merge 到输入数据中...")
  627. # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
  628. print(">>> 合并后 df_input 样例:")
  629. print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
  630. # 预测模式
  631. else:
  632. print(">>> 预测模式:补齐 target 相关列(全部置 0)")
  633. df_input['target_will_price_drop'] = 0
  634. df_input['target_amount_of_drop'] = 0.0
  635. df_input['target_time_to_drop'] = 0
  636. # 按顺序排列
  637. order_columns = [
  638. "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day",
  639. "seats_remaining", "baggage", "baggage_level",
  640. "price_decrease_times_total", "price_decrease_times_consecutive", "price_last_decrease_hours",
  641. "price_increase_times_total", "price_increase_times_consecutive", "price_last_increase_hours",
  642. "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_amount_of_drop", "target_time_to_drop",
  643. "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "crawl_date", "gid",
  644. "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1",
  645. "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer",
  646. "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration",
  647. "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country",
  648. "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday",
  649. "price_weighted_percentile_25", "price_weighted_percentile_50", "price_weighted_percentile_75", "price_weighted_percentile_90",
  650. "price_zone_comprehensive", "price_relative_position",
  651. ]
  652. df_input = df_input[order_columns]
  653. return df_input
  654. def standardization(df, feature_scaler, target_scaler=None, is_training=True, is_val=False, feature_length=240):
  655. print(">>> 开始标准化处理")
  656. # 准备走标准化的特征
  657. scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration',
  658. 'price_weighted_percentile_25', 'price_weighted_percentile_50',
  659. 'price_weighted_percentile_75', 'price_weighted_percentile_90']
  660. if is_training:
  661. print(">>> 特征数据标准化开始")
  662. if feature_scaler is None:
  663. feature_scaler = StandardScaler()
  664. if not is_val:
  665. feature_scaler.fit(df[scaler_features])
  666. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  667. print(">>> 特征数据标准化完成")
  668. else:
  669. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  670. print(">>> 预测模式下特征标准化处理完成")
  671. # 准备走归一化的特征
  672. # 事先定义好每个特征的合理范围
  673. fixed_ranges = {
  674. 'hours_until_departure': (0, 480), # 0-20天
  675. 'from_city_num': (0, 38),
  676. 'to_city_num': (0, 38),
  677. 'flight_1_num': (0, 341),
  678. 'flight_2_num': (0, 341),
  679. 'seats_remaining': (1, 5),
  680. # 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次
  681. # 'price_last_change_hours': (0, 480),
  682. 'price_decrease_times_total': (0, 20), # 假设价格下降次数不会超过20次
  683. 'price_decrease_times_consecutive': (0, 10), # 假设价格连续下降次数不会超过10次
  684. 'price_last_decrease_hours': (0, feature_length), #(0-240小时)
  685. 'price_increase_times_total': (0, 20), # 假设价格上升次数不会超过20次
  686. 'price_increase_times_consecutive': (0, 10), # 假设价格连续上升次数不会超过10次
  687. 'price_last_increase_hours': (0, feature_length), #(0-240小时)
  688. 'price_zone_comprehensive': (0, 5),
  689. 'days_to_departure': (0, 30),
  690. 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天
  691. 'flight_by_hour': (0, 23),
  692. 'flight_by_day': (1, 31),
  693. 'flight_day_of_month': (1, 12),
  694. 'flight_day_of_week': (0, 6),
  695. 'flight_day_of_quarter': (1, 4),
  696. }
  697. normal_features = list(fixed_ranges.keys())
  698. print(">>> 归一化特征列: ", normal_features)
  699. print(">>> 基于固定范围的特征数据归一化开始")
  700. for col in normal_features:
  701. if col in df.columns:
  702. # 核心归一化公式: (x - min) / (max - min)
  703. col_min, col_max = fixed_ranges[col]
  704. df[col] = (df[col] - col_min) / (col_max - col_min)
  705. # 添加裁剪,将超出范围的值强制限制在[0,1]区间
  706. df[col] = df[col].clip(0, 1)
  707. print(">>> 基于固定范围的特征数据归一化完成")
  708. return df, feature_scaler, target_scaler
  709. def preprocess_data_simple(df_input, is_train=False, output_dir='.'):
  710. df_input = preprocess_data_first_half(df_input)
  711. # 在 gid 与 baggage 内按时间降序
  712. df_input = df_input.sort_values(
  713. by=['gid', 'baggage', 'hours_until_departure'],
  714. ascending=[True, True, False]
  715. ).reset_index(drop=True)
  716. df_input = df_input[df_input['hours_until_departure'] <= 480]
  717. df_input = df_input[df_input['baggage'] == 30]
  718. # 保留真实的而不是补齐的数据
  719. if not is_train:
  720. df_input = df_input[df_input['is_filled'] == 0]
  721. # 计算价格变化量
  722. df_input['price_change_amount'] = (
  723. df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
  724. .apply(lambda s: s.diff().replace(0, np.nan).ffill().fillna(0)).round(2)
  725. )
  726. # 计算价格变化百分比(相对于上一时间点的变化率)
  727. df_input['price_change_percent'] = (
  728. df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
  729. .apply(lambda s: s.pct_change().replace(0, np.nan).ffill().fillna(0)).round(4)
  730. )
  731. # 第一步:标记价格变化段
  732. df_input['price_change_segment'] = (
  733. df_input.groupby(['gid', 'baggage'], group_keys=False)['price_change_amount']
  734. .apply(lambda s: (s != s.shift()).cumsum())
  735. )
  736. # 第二步:计算每个变化段内的持续时间
  737. df_input['price_duration_hours'] = (
  738. df_input.groupby(['gid', 'baggage', 'price_change_segment'], group_keys=False)
  739. .cumcount()
  740. .add(1)
  741. )
  742. # 可选:删除临时列
  743. df_input = df_input.drop(columns=['price_change_segment'])
  744. adult_price = df_input.pop('Adult_Total_Price')
  745. hours_until = df_input.pop('Hours_Until_Departure')
  746. df_input['Adult_Total_Price'] = adult_price
  747. df_input['Hours_Until_Departure'] = hours_until
  748. df_input['Baggage'] = df_input['baggage']
  749. if is_train:
  750. df_target = df_input[(df_input['hours_until_departure'] >= 18) & (df_input['hours_until_departure'] <= 54)].copy()
  751. df_target = df_target.sort_values(
  752. by=['gid', 'hours_until_departure'],
  753. ascending=[True, False]
  754. ).reset_index(drop=True)
  755. prev_pct = df_target.groupby('gid', group_keys=False)['price_change_percent'].shift(1)
  756. prev_amo = df_target.groupby('gid', group_keys=False)['price_change_amount'].shift(1)
  757. prev_dur = df_target.groupby('gid', group_keys=False)['price_duration_hours'].shift(1)
  758. drop_mask = (prev_pct > 0) & (df_target['price_change_percent'] < 0)
  759. df_drop_nodes = df_target.loc[drop_mask, ['gid', 'hours_until_departure']].copy()
  760. df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
  761. df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  762. df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  763. df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
  764. df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
  765. df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
  766. df_drop_nodes = df_drop_nodes.reset_index(drop=True)
  767. flight_info_cols = [
  768. 'city_pair',
  769. 'flight_number_1', 'seg1_dep_air_port', 'seg1_dep_time', 'seg1_arr_air_port', 'seg1_arr_time',
  770. 'flight_number_2', 'seg2_dep_air_port', 'seg2_dep_time', 'seg2_arr_air_port', 'seg2_arr_time',
  771. 'currency', 'baggage', 'flight_day',
  772. ]
  773. df_gid_info = df_target[['gid'] + flight_info_cols].drop_duplicates(subset=['gid']).reset_index(drop=True)
  774. df_drop_nodes = df_drop_nodes.merge(df_gid_info, on='gid', how='left')
  775. drop_info_cols = ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  776. 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount'
  777. ]
  778. # 按顺序排列 去掉gid
  779. order_columns = flight_info_cols + drop_info_cols
  780. df_drop_nodes = df_drop_nodes[order_columns]
  781. del df_gid_info
  782. del df_target
  783. else:
  784. df_drop_nodes = None
  785. return df_input, df_drop_nodes