data_preprocess.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. import pandas as pd
  2. import numpy as np
  3. import bisect
  4. import gc
  5. from datetime import datetime, timedelta
  6. from sklearn.preprocessing import StandardScaler
  7. from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays
  8. from utils import insert_df_col
  9. COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
  10. def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=36):
  11. print(">>> 开始数据预处理")
  12. # 生成 城市对
  13. df_input['city_pair'] = (
  14. df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
  15. )
  16. # 城市码映射成数字
  17. df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
  18. df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
  19. missing_from = (
  20. df_input.loc[df_input['from_city_num'].isna(), 'from_city_code']
  21. .unique()
  22. )
  23. missing_to = (
  24. df_input.loc[df_input['to_city_num'].isna(), 'to_city_code']
  25. .unique()
  26. )
  27. if missing_from:
  28. print("未映射的 from_city:", missing_from)
  29. if missing_to:
  30. print("未映射的 to_city:", missing_to)
  31. # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
  32. cols = df_input.columns.tolist()
  33. # 删除已存在的几列(保证顺序正确)
  34. for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
  35. cols.remove(c)
  36. # 这几列插入到最前面
  37. df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
  38. pass
  39. # 转格式
  40. df_input['search_dep_time'] = pd.to_datetime(
  41. df_input['search_dep_time'],
  42. format='%Y%m%d',
  43. errors='coerce'
  44. ).dt.strftime('%Y-%m-%d')
  45. # 重命名起飞日期
  46. df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
  47. # 重命名航班号
  48. df_input.rename(
  49. columns={
  50. 'seg1_flight_number': 'flight_number_1',
  51. 'seg2_flight_number': 'flight_number_2'
  52. },
  53. inplace=True
  54. )
  55. # 分开填充
  56. df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
  57. df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
  58. # 航班号转数字
  59. df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map)
  60. df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map)
  61. missing_flight_1 = (
  62. df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1']
  63. .unique()
  64. )
  65. missing_flight_2 = (
  66. df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2']
  67. .unique()
  68. )
  69. if missing_flight_1:
  70. print("未映射的 flight_1:", missing_flight_1)
  71. if missing_flight_2:
  72. print("未映射的 flight_2:", missing_flight_2)
  73. # flight_1_num 放在 seg1_dep_air_port 之前
  74. insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port')
  75. # flight_2_num 放在 seg2_dep_air_port 之前
  76. insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port')
  77. df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0
  78. # baggage_level 放在 flight_number_2 之前
  79. insert_df_col(df_input, 'baggage_level', 'flight_number_2')
  80. df_input['Adult_Total_Price'] = df_input['adult_total_price']
  81. # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值
  82. insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining')
  83. df_input['Hours_Until_Departure'] = df_input['hours_until_departure']
  84. # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值
  85. insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure')
  86. pass
  87. # gid:基于指定字段的分组标记(整数)
  88. df_input['gid'] = (
  89. df_input
  90. .groupby(
  91. ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
  92. sort=False
  93. )
  94. .ngroup()
  95. )
  96. # 做一下时间段裁剪, 保留起飞前480小时之内且大于等于4小时的
  97. df_input = df_input[(df_input['hours_until_departure'] < 480) &
  98. (df_input['hours_until_departure'] >= 4)].reset_index(drop=True)
  99. # 在 gid 与 baggage 内按时间降序
  100. df_input = df_input.sort_values(
  101. by=['gid', 'baggage', 'hours_until_departure'],
  102. ascending=[True, True, False]
  103. ).reset_index(drop=True)
  104. # 价格变化掩码
  105. g = df_input.groupby(['gid', 'baggage'])
  106. diff = g['adult_total_price'].transform('diff')
  107. change_mask = diff.abs() >= 5 # 变化太小的不计入
  108. # 价格变化次数
  109. df_input['price_change_times_total'] = (
  110. change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  111. )
  112. # 上次发生变价的小时数
  113. last_change_hour = (
  114. df_input['hours_until_departure']
  115. .where(change_mask)
  116. .groupby([df_input['gid'], df_input['baggage']])
  117. .ffill() # 前向填充
  118. )
  119. # 当前距离上一次变价过去多少小时
  120. df_input['price_last_change_hours'] = (
  121. last_change_hour - df_input['hours_until_departure']
  122. ).fillna(0)
  123. pass
  124. # 想插入到 seats_remaining 前面的新列
  125. new_cols = [
  126. 'price_change_times_total',
  127. 'price_last_change_hours'
  128. ]
  129. # 当前所有列
  130. cols = df_input.columns.tolist()
  131. # 找到 seats_remaining 的位置
  132. idx = cols.index('seats_remaining')
  133. # 重新拼列顺序
  134. new_order = cols[:idx] + new_cols + cols[idx:]
  135. # 去重(防止列已经在原位置)
  136. new_order = list(dict.fromkeys(new_order))
  137. # 重新排列 DataFrame
  138. df_input = df_input[new_order]
  139. pass
  140. print(">>> 计算价格区间特征")
  141. # 1. 基于绝对价格水平的价格区间划分
  142. # 先计算每个(gid, baggage)的价格统计特征
  143. # g = df_input.groupby(['gid', 'baggage'])
  144. price_stats = df_input.groupby(['gid', 'baggage'])['adult_total_price'].agg(
  145. min_price='min',
  146. max_price='max',
  147. mean_price='mean',
  148. std_price='std'
  149. ).reset_index()
  150. # 合并统计特征到原数据
  151. df_input = df_input.merge(price_stats, on=['gid', 'baggage'], how='left')
  152. # 2. 基于绝对价格的价格区间划分 (可以删除,因为后面有更精细的基于频率加权的分类)
  153. # # 高价区间:超过均值+1倍标准差
  154. # df_input['price_absolute_high'] = (df_input['adult_total_price'] >
  155. # (df_input['mean_price'] + df_input['std_price'])).astype(int)
  156. # # 中高价区间:均值到均值+1倍标准差
  157. # df_input['price_absolute_mid_high'] = ((df_input['adult_total_price'] > df_input['mean_price']) &
  158. # (df_input['adult_total_price'] <= (df_input['mean_price'] + df_input['std_price']))).astype(int)
  159. # # 中低价区间:均值-1倍标准差到均值
  160. # df_input['price_absolute_mid_low'] = ((df_input['adult_total_price'] > (df_input['mean_price'] - df_input['std_price'])) &
  161. # (df_input['adult_total_price'] <= df_input['mean_price'])).astype(int)
  162. # # 低价区间:低于均值-1倍标准差
  163. # df_input['price_absolute_low'] = (df_input['adult_total_price'] <= (df_input['mean_price'] - df_input['std_price'])).astype(int)
  164. # 3. 基于频率加权的价格百分位数(改进版)
  165. # 计算每个价格出现的频率
  166. price_freq = df_input.groupby(['gid', 'baggage', 'adult_total_price']).size().reset_index(name='price_frequency')
  167. df_input = df_input.merge(price_freq, on=['gid', 'baggage', 'adult_total_price'], how='left')
  168. # 计算频率加权的百分位数
  169. def weighted_percentile(group):
  170. if len(group) == 0:
  171. return pd.Series([np.nan] * 4, index=['price_weighted_percentile_25',
  172. 'price_weighted_percentile_50',
  173. 'price_weighted_percentile_75',
  174. 'price_weighted_percentile_90'])
  175. # 按价格排序,计算累积频率
  176. group = group.sort_values('adult_total_price')
  177. group['cum_freq'] = group['price_frequency'].cumsum()
  178. total_freq = group['price_frequency'].sum()
  179. # 计算加权百分位数
  180. percentiles = []
  181. for p in [0.25, 0.5, 0.75, 0.9]:
  182. threshold = total_freq * p
  183. # 找到第一个累积频率超过阈值的价格
  184. mask = group['cum_freq'] >= threshold
  185. if mask.any():
  186. percentile_value = group.loc[mask.idxmax(), 'adult_total_price']
  187. else:
  188. percentile_value = group['adult_total_price'].max()
  189. percentiles.append(percentile_value)
  190. return pd.Series(percentiles, index=['price_weighted_percentile_25',
  191. 'price_weighted_percentile_50',
  192. 'price_weighted_percentile_75',
  193. 'price_weighted_percentile_90'])
  194. # 按gid和baggage分组计算加权百分位数
  195. weighted_percentiles = df_input.groupby(['gid', 'baggage']).apply(weighted_percentile).reset_index()
  196. df_input = df_input.merge(weighted_percentiles, on=['gid', 'baggage'], how='left')
  197. # 4. 结合绝对价格和频率的综合判断(改进版)
  198. freq_median = df_input.groupby(['gid', 'baggage'])['price_frequency'].transform('median')
  199. # 计算价格相对于90%百分位数的倍数,用于区分不同级别的高价
  200. df_input['price_relative_to_90p'] = df_input['adult_total_price'] / df_input['price_weighted_percentile_90']
  201. # 添加价格容忍度:避免相近价格被分到不同区间
  202. # 计算价格差异容忍度(使用各百分位数的1%作为容忍度阈值)
  203. # tolerance_90p = df_input['price_weighted_percentile_90'] * 0.01
  204. tolerance_75p = df_input['price_weighted_percentile_75'] * 0.01
  205. tolerance_50p = df_input['price_weighted_percentile_50'] * 0.01
  206. tolerance_25p = df_input['price_weighted_percentile_25'] * 0.01
  207. # 重新设计价格区间分类(确保无重叠):
  208. # 首先定义各个区间的mask
  209. # 4.1 异常高价:价格远高于90%百分位数(超过1.5倍)且频率极低(低于中位数的1/3)
  210. price_abnormal_high_mask = ((df_input['price_relative_to_90p'] > 1.5) &
  211. (df_input['price_frequency'] < freq_median * 0.33))
  212. # 4.2 真正高位:严格满足条件(价格 > 90%分位数 且 频率 < 中位数)
  213. price_real_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_90']) &
  214. (df_input['price_frequency'] < freq_median) &
  215. ~price_abnormal_high_mask)
  216. # 4.3 正常高位:使用容忍度(价格接近75%分位数)
  217. price_normal_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_75'] - tolerance_75p) &
  218. ~price_real_high_mask & ~price_abnormal_high_mask)
  219. # 4.4 中高价:使用容忍度(价格在50%-75%分位数之间)
  220. price_mid_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_50'] - tolerance_50p) &
  221. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_75'] + tolerance_75p) &
  222. ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  223. # 4.5 中低价:使用容忍度(价格在25%-50%分位数之间)
  224. price_mid_low_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_25'] - tolerance_25p) &
  225. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_50'] + tolerance_50p) &
  226. ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  227. # 4.6 低价:严格满足条件(价格 ≤ 25%分位数)
  228. price_low_mask = ((df_input['adult_total_price'] <= df_input['price_weighted_percentile_25']) &
  229. ~price_mid_low_mask & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  230. # 使用np.select确保互斥性
  231. price_zone_masks = [
  232. price_abnormal_high_mask, # 异常高价区(5级)
  233. price_real_high_mask, # 真正高价区(4级)
  234. price_normal_high_mask, # 正常高价区(3级)
  235. price_mid_high_mask, # 中高价区(2级)
  236. price_mid_low_mask, # 中低价区(1级)
  237. price_low_mask, # 低价区(0级)
  238. ]
  239. price_zone_values = [5, 4, 3, 2, 1, 0] # 5:异常高价, 4:真正高价, 3:正常高价, 2:中高价, 1:中低价, 0:低价
  240. # 使用np.select确保每个价格只被分到一个区间
  241. price_zone_result = np.select(price_zone_masks, price_zone_values, default=2) # 默认中高价
  242. # 4.8 价格区间综合标记
  243. df_input['price_zone_comprehensive'] = price_zone_result
  244. # 5. 价格异常度检测
  245. # 价格相对于均值的标准化偏差
  246. df_input['price_z_score'] = (df_input['adult_total_price'] - df_input['mean_price']) / df_input['std_price']
  247. # 价格异常度:基于Z-score的绝对值
  248. df_input['price_anomaly_score'] = np.abs(df_input['price_z_score'])
  249. # 6. 价格稳定性特征
  250. # 计算价格波动系数(标准差/均值)
  251. df_input['price_coefficient_variation'] = df_input['std_price'] / df_input['mean_price']
  252. # 7. 价格趋势特征
  253. # 计算当前价格相对于历史价格的位置
  254. df_input['price_relative_position'] = (df_input['adult_total_price'] - df_input['min_price']) / (df_input['max_price'] - df_input['min_price'])
  255. df_input['price_relative_position'] = df_input['price_relative_position'].fillna(0.5) # 兜底
  256. # 删除中间计算列
  257. df_input.drop(columns=['price_frequency', 'price_z_score', 'price_relative_to_90p'], inplace=True, errors='ignore')
  258. del price_freq
  259. del price_stats
  260. del weighted_percentiles
  261. del freq_median
  262. print(">>> 改进版价格区间特征计算完成")
  263. # 生成第一机场对
  264. df_input['airport_pair_1'] = (
  265. df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
  266. )
  267. # 删除原始第一机场码
  268. df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
  269. # 第一机场对 放到 seg1_dep_time 列的前面
  270. insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time')
  271. # 生成第二机场对(带缺失兜底)
  272. df_input['airport_pair_2'] = np.where(
  273. df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
  274. 'NA',
  275. df_input['seg2_dep_air_port'].astype(str) + "-" +
  276. df_input['seg2_arr_air_port'].astype(str)
  277. )
  278. # 删除原始第二机场码
  279. df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
  280. # 第二机场对 放到 seg2_dep_time 列的前面
  281. insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time')
  282. # 是否转乘
  283. df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
  284. # 是否转乘 放到 flight_number_2 列的前面
  285. insert_df_col(df_input, 'is_transfer', 'flight_number_2')
  286. # 重命名起飞时刻与到达时刻
  287. df_input.rename(
  288. columns={
  289. 'seg1_dep_time': 'dep_time_1',
  290. 'seg1_arr_time': 'arr_time_1',
  291. 'seg2_dep_time': 'dep_time_2',
  292. 'seg2_arr_time': 'arr_time_2',
  293. },
  294. inplace=True
  295. )
  296. # 第一段飞行时长
  297. df_input['fly_duration_1'] = (
  298. (df_input['arr_time_1'] - df_input['dep_time_1'])
  299. .dt.total_seconds() / 3600
  300. ).round(2)
  301. # 第二段飞行时长(无转乘为 0)
  302. df_input['fly_duration_2'] = (
  303. (df_input['arr_time_2'] - df_input['dep_time_2'])
  304. .dt.total_seconds() / 3600
  305. ).fillna(0).round(2)
  306. # 总飞行时长
  307. df_input['fly_duration'] = (
  308. df_input['fly_duration_1'] + df_input['fly_duration_2']
  309. ).round(2)
  310. # 中转停留时长(无转乘为 0)
  311. df_input['stop_duration'] = (
  312. (df_input['dep_time_2'] - df_input['arr_time_1'])
  313. .dt.total_seconds() / 3600
  314. ).fillna(0).round(2)
  315. # 裁剪,防止负数
  316. # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
  317. # df_input[c] = df_input[c].clip(lower=0)
  318. # 和 is_transfer 逻辑保持一致
  319. # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
  320. # 一次性插到 is_filled 前面
  321. insert_before = 'is_filled'
  322. new_cols = [
  323. 'fly_duration_1',
  324. 'fly_duration_2',
  325. 'fly_duration',
  326. 'stop_duration'
  327. ]
  328. cols = df_input.columns.tolist()
  329. idx = cols.index(insert_before)
  330. # 删除旧位置
  331. cols = [c for c in cols if c not in new_cols]
  332. # 插入新位置(顺序保持)
  333. cols[idx:idx] = new_cols # python独有空切片插入法
  334. df_input = df_input[cols]
  335. # 一次生成多个字段
  336. dep_t1 = df_input['dep_time_1']
  337. # 几点起飞(0–23)
  338. df_input['flight_by_hour'] = dep_t1.dt.hour
  339. # 起飞日期几号(1–31)
  340. df_input['flight_by_day'] = dep_t1.dt.day
  341. # 起飞日期几月(1–12)
  342. df_input['flight_day_of_month'] = dep_t1.dt.month
  343. # 起飞日期周几(0=周一, 6=周日)
  344. df_input['flight_day_of_week'] = dep_t1.dt.weekday
  345. # 起飞日期季度(1–4)
  346. df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
  347. # 是否周末(周六 / 周日)
  348. df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
  349. # 找到对应的国家码
  350. df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
  351. df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
  352. # 整体出发时间 就是 dep_time_1
  353. df_input['global_dep_time'] = df_input['dep_time_1']
  354. # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
  355. df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
  356. # 出发日期在出发国家是否节假日
  357. df_input['dep_country_is_holiday'] = df_input.apply(
  358. lambda r: r['global_dep_time'].date()
  359. in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
  360. axis=1
  361. ).astype(int)
  362. # 到达日期在到达国家是否节假日
  363. df_input['arr_country_is_holiday'] = df_input.apply(
  364. lambda r: r['global_arr_time'].date()
  365. in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
  366. axis=1
  367. ).astype(int)
  368. # 在任一侧是否节假日
  369. df_input['any_country_is_holiday'] = (
  370. df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
  371. .max(axis=1)
  372. )
  373. # 是否跨国航线
  374. df_input['is_cross_country'] = (
  375. df_input['dep_country'] != df_input['arr_country']
  376. ).astype(int)
  377. def days_to_next_holiday(country, cur_date):
  378. if pd.isna(country) or pd.isna(cur_date):
  379. return np.nan
  380. holidays = COUNTRY_HOLIDAYS.get(country)
  381. if not holidays:
  382. return np.nan
  383. # 找未来(含当天)的节假日,并排序
  384. future_holidays = sorted([d for d in holidays if d >= cur_date])
  385. if not future_holidays:
  386. return np.nan
  387. next_holiday = future_holidays[0] # 第一个未来节假日
  388. delta_days = (next_holiday - cur_date).days
  389. return delta_days
  390. df_input['days_to_holiday'] = df_input.apply(
  391. lambda r: days_to_next_holiday(
  392. r['dep_country'],
  393. r['update_hour'].date()
  394. ),
  395. axis=1
  396. )
  397. # 没有未来节假日的统一兜底
  398. # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
  399. # days_to_holiday 插在 update_hour 前面
  400. insert_df_col(df_input, 'days_to_holiday', 'update_hour')
  401. # 训练模式
  402. if is_training:
  403. print(">>> 训练模式:计算 target 相关列")
  404. print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}")
  405. target_lower_limit = 4
  406. target_upper_limit = current_n_hours
  407. mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
  408. df_targets = df_input.loc[mask_targets].copy()
  409. targets_amout = df_targets.shape[0]
  410. print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
  411. if targets_amout == 0:
  412. print(f">>> n_hours = {current_n_hours} 无有效数据,跳过")
  413. return pd.DataFrame()
  414. print(">>> 计算 price_at_n_hours")
  415. df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy()
  416. df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前36\32\30小时
  417. # 提取并重命名 price 列
  418. df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
  419. print(">>> price_at_n_hours计算完成,示例:")
  420. print(df_last_price_at_n_hours.head(5))
  421. # 新的计算降价方式
  422. # 先排序
  423. df_targets = df_targets.sort_values(
  424. ['gid', 'hours_until_departure'],
  425. ascending=[True, False]
  426. )
  427. # 在 gid 内计算价格变化
  428. g = df_targets.groupby('gid', group_keys=False)
  429. df_targets['price_diff'] = g['adult_total_price'].diff()
  430. VALID_DROP_MIN = 10
  431. LOWER_HOUR = 4
  432. UPPER_HOUR = 28
  433. valid_drop_mask = (
  434. (df_targets['price_diff'] <= -VALID_DROP_MIN) &
  435. (df_targets['hours_until_departure'] >= LOWER_HOUR) &
  436. (df_targets['hours_until_departure'] <= UPPER_HOUR)
  437. )
  438. # 有效的降价
  439. df_valid_drops = df_targets.loc[valid_drop_mask]
  440. # 找「第一次」降价(每个 gid)
  441. df_first_price_drop = (
  442. df_valid_drops
  443. .groupby('gid', as_index=False)
  444. .first()
  445. )
  446. # 简化列
  447. df_first_price_drop = df_first_price_drop[
  448. ['gid', 'hours_until_departure', 'adult_total_price', 'price_diff']
  449. ].rename(columns={
  450. 'hours_until_departure': 'time_to_price_drop',
  451. 'adult_total_price': 'price_at_d_hours',
  452. 'price_diff': 'amount_of_price_drop',
  453. })
  454. # 把降价幅度转成正数(更直观)
  455. df_first_price_drop['amount_of_price_drop'] = (-df_first_price_drop['amount_of_price_drop']).round(2)
  456. pass
  457. # # 计算降价信息
  458. # print(">>> 计算降价信息")
  459. # df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
  460. # df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
  461. # df_targets['price_dropped'] = (
  462. # (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
  463. # (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
  464. # )
  465. # df_price_drops = df_targets[df_targets['price_dropped']].copy()
  466. # price_drops_len = df_price_drops.shape[0]
  467. # if price_drops_len == 0:
  468. # print(f">>> n_hours = {current_n_hours} 无降价信息")
  469. # # 创建包含指定列的空 DataFrame
  470. # df_price_drop_info = pd.DataFrame({
  471. # 'gid': pd.Series(dtype='int64'),
  472. # 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
  473. # 'price_at_first_drop_hours': pd.Series(dtype='float64')
  474. # })
  475. # else:
  476. # df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
  477. # df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
  478. # 'hours_until_departure': 'first_drop_hours_until_departure',
  479. # 'adult_total_price': 'price_at_first_drop_hours'
  480. # })
  481. # print(">>> 降价信息计算完成,示例:")
  482. # print(df_price_drop_info.head(5))
  483. # # 合并信息
  484. # df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
  485. # df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
  486. # df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
  487. # df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
  488. # df_gid_info['time_to_price_drop'] = current_n_hours - df_gid_info['first_drop_hours_until_departure']
  489. # df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
  490. # del df_input_object
  491. # del df_last
  492. # del df_last_price_at_n_hours
  493. # del df_price_drops
  494. # del df_price_drop_info
  495. df_gid_info = df_last_price_at_n_hours.merge(df_first_price_drop, on='gid', how='left')
  496. df_gid_info['will_price_drop'] = df_gid_info['time_to_price_drop'].notnull().astype(int)
  497. df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0)
  498. df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0)
  499. pass
  500. del df_input_object
  501. del df_last
  502. del df_last_price_at_n_hours
  503. del df_first_price_drop
  504. del df_valid_drops
  505. del df_targets
  506. gc.collect()
  507. # 将目标变量合并到输入数据中
  508. print(">>> 将目标变量信息合并到 df_input")
  509. df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
  510. # 使用 0 填充 NaN 值
  511. df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
  512. ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
  513. df_input = df_input.rename(columns={
  514. 'will_price_drop': 'target_will_price_drop',
  515. 'amount_of_price_drop': 'target_amount_of_drop',
  516. 'time_to_price_drop': 'target_time_to_drop'
  517. })
  518. # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
  519. # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
  520. # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
  521. # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
  522. # gid_count = df_min_price_by_gid.shape[0]
  523. # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
  524. # # 将最小价格 merge 到 df_inputs 中
  525. # print(">>> 将最小价格 merge 到输入数据中...")
  526. # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
  527. print(">>> 合并后 df_input 样例:")
  528. print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
  529. # 预测模式
  530. else:
  531. print(">>> 预测模式:补齐 target 相关列(全部置 0)")
  532. df_input['target_will_price_drop'] = 0
  533. df_input['target_amount_of_drop'] = 0.0
  534. df_input['target_time_to_drop'] = 0
  535. # 按顺序排列
  536. order_columns = [
  537. "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day",
  538. "seats_remaining", "baggage", "baggage_level",
  539. "price_change_times_total", "price_last_change_hours", "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_amount_of_drop", "target_time_to_drop",
  540. "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "crawl_date", "gid",
  541. "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1",
  542. "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer",
  543. "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration",
  544. "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country",
  545. "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday",
  546. "price_weighted_percentile_25", "price_weighted_percentile_50", "price_weighted_percentile_75", "price_weighted_percentile_90",
  547. "price_zone_comprehensive", "price_relative_position",
  548. ]
  549. df_input = df_input[order_columns]
  550. return df_input
  551. def standardization(df, feature_scaler, target_scaler=None, is_training=True, is_val=False):
  552. print(">>> 开始标准化处理")
  553. # 准备走标准化的特征
  554. scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration',
  555. 'price_weighted_percentile_25', 'price_weighted_percentile_50',
  556. 'price_weighted_percentile_75', 'price_weighted_percentile_90']
  557. if is_training:
  558. print(">>> 特征数据标准化开始")
  559. if feature_scaler is None:
  560. feature_scaler = StandardScaler()
  561. if not is_val:
  562. feature_scaler.fit(df[scaler_features])
  563. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  564. print(">>> 特征数据标准化完成")
  565. else:
  566. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  567. print(">>> 预测模式下特征标准化处理完成")
  568. # 准备走归一化的特征
  569. # 事先定义好每个特征的合理范围
  570. fixed_ranges = {
  571. 'hours_until_departure': (0, 480), # 0-20天
  572. 'from_city_num': (0, 38),
  573. 'to_city_num': (0, 38),
  574. 'flight_1_num': (0, 341),
  575. 'flight_2_num': (0, 341),
  576. 'seats_remaining': (1, 5),
  577. 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次
  578. 'price_last_change_hours': (0, 480),
  579. 'price_zone_comprehensive': (0, 5),
  580. 'days_to_departure': (0, 30),
  581. 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天
  582. 'flight_by_hour': (0, 23),
  583. 'flight_by_day': (1, 31),
  584. 'flight_day_of_month': (1, 12),
  585. 'flight_day_of_week': (0, 6),
  586. 'flight_day_of_quarter': (1, 4),
  587. }
  588. normal_features = list(fixed_ranges.keys())
  589. print(">>> 归一化特征列: ", normal_features)
  590. print(">>> 基于固定范围的特征数据归一化开始")
  591. for col in normal_features:
  592. if col in df.columns:
  593. # 核心归一化公式: (x - min) / (max - min)
  594. col_min, col_max = fixed_ranges[col]
  595. df[col] = (df[col] - col_min) / (col_max - col_min)
  596. # 添加裁剪,将超出范围的值强制限制在[0,1]区间
  597. df[col] = df[col].clip(0, 1)
  598. print(">>> 基于固定范围的特征数据归一化完成")
  599. return df, feature_scaler, target_scaler