data_preprocess.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. import pandas as pd
  2. import numpy as np
  3. import bisect
  4. import gc
  5. from datetime import datetime, timedelta
  6. from sklearn.preprocessing import StandardScaler
  7. from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays
  8. from utils import insert_df_col
  9. COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
  10. def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=28):
  11. print(">>> 开始数据预处理")
  12. # 生成 城市对
  13. df_input['city_pair'] = (
  14. df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
  15. )
  16. # 城市码映射成数字
  17. df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
  18. df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
  19. missing_from = (
  20. df_input.loc[df_input['from_city_num'].isna(), 'from_city_code']
  21. .unique()
  22. )
  23. missing_to = (
  24. df_input.loc[df_input['to_city_num'].isna(), 'to_city_code']
  25. .unique()
  26. )
  27. if missing_from:
  28. print("未映射的 from_city:", missing_from)
  29. if missing_to:
  30. print("未映射的 to_city:", missing_to)
  31. # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
  32. cols = df_input.columns.tolist()
  33. # 删除已存在的几列(保证顺序正确)
  34. for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
  35. cols.remove(c)
  36. # 这几列插入到最前面
  37. df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
  38. pass
  39. # 转格式
  40. df_input['search_dep_time'] = pd.to_datetime(
  41. df_input['search_dep_time'],
  42. format='%Y%m%d',
  43. errors='coerce'
  44. ).dt.strftime('%Y-%m-%d')
  45. # 重命名起飞日期
  46. df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
  47. # 重命名航班号
  48. df_input.rename(
  49. columns={
  50. 'seg1_flight_number': 'flight_number_1',
  51. 'seg2_flight_number': 'flight_number_2'
  52. },
  53. inplace=True
  54. )
  55. # 分开填充
  56. df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
  57. df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
  58. # 航班号转数字
  59. df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map)
  60. df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map)
  61. missing_flight_1 = (
  62. df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1']
  63. .unique()
  64. )
  65. missing_flight_2 = (
  66. df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2']
  67. .unique()
  68. )
  69. if missing_flight_1:
  70. print("未映射的 flight_1:", missing_flight_1)
  71. if missing_flight_2:
  72. print("未映射的 flight_2:", missing_flight_2)
  73. # flight_1_num 放在 seg1_dep_air_port 之前
  74. insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port')
  75. # flight_2_num 放在 seg2_dep_air_port 之前
  76. insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port')
  77. df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0
  78. # baggage_level 放在 flight_number_2 之前
  79. insert_df_col(df_input, 'baggage_level', 'flight_number_2')
  80. df_input['Adult_Total_Price'] = df_input['adult_total_price']
  81. # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值
  82. insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining')
  83. df_input['Hours_Until_Departure'] = df_input['hours_until_departure']
  84. # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值
  85. insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure')
  86. pass
  87. # gid:基于指定字段的分组标记(整数)
  88. df_input['gid'] = (
  89. df_input
  90. .groupby(
  91. ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
  92. sort=False
  93. )
  94. .ngroup()
  95. )
  96. # 做一下时间段裁剪, 保留起飞前480小时之内的
  97. df_input = df_input[df_input['hours_until_departure'] < 480].reset_index(drop=True)
  98. pass
  99. # 在 gid 与 baggage 内按时间降序
  100. df_input = df_input.sort_values(
  101. by=['gid', 'baggage', 'hours_until_departure'],
  102. ascending=[True, True, False]
  103. ).reset_index(drop=True)
  104. # 价格变化掩码
  105. g = df_input.groupby(['gid', 'baggage'])
  106. diff = g['adult_total_price'].transform('diff')
  107. change_mask = diff.abs() >= 5 # 变化太小的不计入
  108. # 价格变化次数
  109. df_input['price_change_times_total'] = (
  110. change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  111. )
  112. # 上次发生变价的小时数
  113. last_change_hour = (
  114. df_input['hours_until_departure']
  115. .where(change_mask)
  116. .groupby([df_input['gid'], df_input['baggage']])
  117. .ffill() # 前向填充
  118. )
  119. # 当前距离上一次变价过去多少小时
  120. df_input['price_last_change_hours'] = (
  121. last_change_hour - df_input['hours_until_departure']
  122. ).fillna(0)
  123. pass
  124. # 想插入到 seats_remaining 前面的新列
  125. new_cols = [
  126. 'price_change_times_total',
  127. 'price_last_change_hours'
  128. ]
  129. # 当前所有列
  130. cols = df_input.columns.tolist()
  131. # 找到 seats_remaining 的位置
  132. idx = cols.index('seats_remaining')
  133. # 重新拼列顺序
  134. new_order = cols[:idx] + new_cols + cols[idx:]
  135. # 去重(防止列已经在原位置)
  136. new_order = list(dict.fromkeys(new_order))
  137. # 重新排列 DataFrame
  138. df_input = df_input[new_order]
  139. pass
  140. # 生成第一机场对
  141. df_input['airport_pair_1'] = (
  142. df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
  143. )
  144. # 删除原始第一机场码
  145. df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
  146. # 第一机场对 放到 seg1_dep_time 列的前面
  147. insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time')
  148. # 生成第二机场对(带缺失兜底)
  149. df_input['airport_pair_2'] = np.where(
  150. df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
  151. 'NA',
  152. df_input['seg2_dep_air_port'].astype(str) + "-" +
  153. df_input['seg2_arr_air_port'].astype(str)
  154. )
  155. # 删除原始第二机场码
  156. df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
  157. # 第二机场对 放到 seg2_dep_time 列的前面
  158. insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time')
  159. # 是否转乘
  160. df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
  161. # 是否转乘 放到 flight_number_2 列的前面
  162. insert_df_col(df_input, 'is_transfer', 'flight_number_2')
  163. # 重命名起飞时刻与到达时刻
  164. df_input.rename(
  165. columns={
  166. 'seg1_dep_time': 'dep_time_1',
  167. 'seg1_arr_time': 'arr_time_1',
  168. 'seg2_dep_time': 'dep_time_2',
  169. 'seg2_arr_time': 'arr_time_2',
  170. },
  171. inplace=True
  172. )
  173. # 第一段飞行时长
  174. df_input['fly_duration_1'] = (
  175. (df_input['arr_time_1'] - df_input['dep_time_1'])
  176. .dt.total_seconds() / 3600
  177. ).round(2)
  178. # 第二段飞行时长(无转乘为 0)
  179. df_input['fly_duration_2'] = (
  180. (df_input['arr_time_2'] - df_input['dep_time_2'])
  181. .dt.total_seconds() / 3600
  182. ).fillna(0).round(2)
  183. # 总飞行时长
  184. df_input['fly_duration'] = (
  185. df_input['fly_duration_1'] + df_input['fly_duration_2']
  186. ).round(2)
  187. # 中转停留时长(无转乘为 0)
  188. df_input['stop_duration'] = (
  189. (df_input['dep_time_2'] - df_input['arr_time_1'])
  190. .dt.total_seconds() / 3600
  191. ).fillna(0).round(2)
  192. # 裁剪,防止负数
  193. # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
  194. # df_input[c] = df_input[c].clip(lower=0)
  195. # 和 is_transfer 逻辑保持一致
  196. # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
  197. # 一次性插到 is_filled 前面
  198. insert_before = 'is_filled'
  199. new_cols = [
  200. 'fly_duration_1',
  201. 'fly_duration_2',
  202. 'fly_duration',
  203. 'stop_duration'
  204. ]
  205. cols = df_input.columns.tolist()
  206. idx = cols.index(insert_before)
  207. # 删除旧位置
  208. cols = [c for c in cols if c not in new_cols]
  209. # 插入新位置(顺序保持)
  210. cols[idx:idx] = new_cols # python独有空切片插入法
  211. df_input = df_input[cols]
  212. # 一次生成多个字段
  213. dep_t1 = df_input['dep_time_1']
  214. # 几点起飞(0–23)
  215. df_input['flight_by_hour'] = dep_t1.dt.hour
  216. # 起飞日期几号(1–31)
  217. df_input['flight_by_day'] = dep_t1.dt.day
  218. # 起飞日期几月(1–12)
  219. df_input['flight_day_of_month'] = dep_t1.dt.month
  220. # 起飞日期周几(0=周一, 6=周日)
  221. df_input['flight_day_of_week'] = dep_t1.dt.weekday
  222. # 起飞日期季度(1–4)
  223. df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
  224. # 是否周末(周六 / 周日)
  225. df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
  226. # 找到对应的国家码
  227. df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
  228. df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
  229. # 整体出发时间 就是 dep_time_1
  230. df_input['global_dep_time'] = df_input['dep_time_1']
  231. # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
  232. df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
  233. # 出发日期在出发国家是否节假日
  234. df_input['dep_country_is_holiday'] = df_input.apply(
  235. lambda r: r['global_dep_time'].date()
  236. in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
  237. axis=1
  238. ).astype(int)
  239. # 到达日期在到达国家是否节假日
  240. df_input['arr_country_is_holiday'] = df_input.apply(
  241. lambda r: r['global_arr_time'].date()
  242. in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
  243. axis=1
  244. ).astype(int)
  245. # 在任一侧是否节假日
  246. df_input['any_country_is_holiday'] = (
  247. df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
  248. .max(axis=1)
  249. )
  250. # 是否跨国航线
  251. df_input['is_cross_country'] = (
  252. df_input['dep_country'] != df_input['arr_country']
  253. ).astype(int)
  254. def days_to_next_holiday(country, cur_date):
  255. if pd.isna(country) or pd.isna(cur_date):
  256. return np.nan
  257. holidays = COUNTRY_HOLIDAYS.get(country)
  258. if not holidays:
  259. return np.nan
  260. # 找未来(含当天)的节假日,并排序
  261. future_holidays = sorted([d for d in holidays if d >= cur_date])
  262. if not future_holidays:
  263. return np.nan
  264. next_holiday = future_holidays[0] # 第一个未来节假日
  265. delta_days = (next_holiday - cur_date).days
  266. return delta_days
  267. df_input['days_to_holiday'] = df_input.apply(
  268. lambda r: days_to_next_holiday(
  269. r['dep_country'],
  270. r['update_hour'].date()
  271. ),
  272. axis=1
  273. )
  274. # 没有未来节假日的统一兜底
  275. # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
  276. # days_to_holiday 插在 update_hour 前面
  277. insert_df_col(df_input, 'days_to_holiday', 'update_hour')
  278. # 制作targets
  279. print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}")
  280. target_lower_limit = 4
  281. target_upper_limit = current_n_hours
  282. mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
  283. df_targets = df_input.loc[mask_targets].copy()
  284. targets_amout = df_targets.shape[0]
  285. print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
  286. if targets_amout == 0:
  287. print(f">>> n_hours = {current_n_hours} 无有效数据,跳过")
  288. return pd.DataFrame()
  289. print(">>> 计算 price_at_n_hours")
  290. df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy()
  291. df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前28小时
  292. # 提取并重命名 price 列
  293. df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
  294. print(">>> price_at_n_hours计算完成,示例:")
  295. print(df_last_price_at_n_hours.head(5))
  296. # 计算降价信息
  297. print(">>> 计算降价信息")
  298. df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
  299. df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
  300. df_targets['price_dropped'] = (
  301. (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
  302. (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
  303. )
  304. df_price_drops = df_targets[df_targets['price_dropped']].copy()
  305. price_drops_len = df_price_drops.shape[0]
  306. if price_drops_len == 0:
  307. print(f">>> n_hours = {current_n_hours} 无降价信息")
  308. # 创建包含指定列的空 DataFrame
  309. df_price_drop_info = pd.DataFrame({
  310. 'gid': pd.Series(dtype='int64'),
  311. 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
  312. 'price_at_first_drop_hours': pd.Series(dtype='float64')
  313. })
  314. else:
  315. df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
  316. df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
  317. 'hours_until_departure': 'first_drop_hours_until_departure',
  318. 'adult_total_price': 'price_at_first_drop_hours'
  319. })
  320. print(">>> 降价信息计算完成,示例:")
  321. print(df_price_drop_info.head(5))
  322. # 合并信息
  323. df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
  324. df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
  325. df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
  326. df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
  327. df_gid_info['time_to_price_drop'] = df_gid_info['first_drop_hours_until_departure']
  328. df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
  329. del df_input_object
  330. del df_last
  331. del df_last_price_at_n_hours
  332. del df_targets
  333. del df_price_drops
  334. del df_price_drop_info
  335. gc.collect()
  336. # 将目标变量合并到输入数据中
  337. print(">>> 将目标变量信息合并到 df_input")
  338. df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
  339. # 使用 0 填充 NaN 值
  340. df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
  341. ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
  342. df_input = df_input.rename(columns={
  343. 'will_price_drop': 'target_will_price_drop',
  344. 'amount_of_price_drop': 'target_amount_of_drop',
  345. 'time_to_price_drop': 'target_time_to_drop'
  346. })
  347. # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
  348. # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
  349. # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
  350. # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
  351. # gid_count = df_min_price_by_gid.shape[0]
  352. # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
  353. # # 将最小价格 merge 到 df_inputs 中
  354. # print(">>> 将最小价格 merge 到输入数据中...")
  355. # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
  356. print(">>> 合并后 df_input 样例:")
  357. print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
  358. # 按顺序排列
  359. order_columns = [
  360. "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day",
  361. "seats_remaining", "baggage", "baggage_level",
  362. "price_change_times_total", "price_last_change_hours", "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_time_to_drop",
  363. "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "gid",
  364. "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1",
  365. "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer",
  366. "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration",
  367. "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country",
  368. "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday",
  369. ]
  370. df_input = df_input[order_columns]
  371. return df_input
  372. def standardization(df, feature_scaler, target_scaler, is_training=True, is_test=False):
  373. print(">>> 开始标准化处理")
  374. # 准备走标准化的特征
  375. scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration']
  376. if is_training:
  377. print(">>> 特征数据标准化开始")
  378. if feature_scaler is None:
  379. feature_scaler = StandardScaler()
  380. if not is_test:
  381. feature_scaler.fit(df[scaler_features])
  382. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  383. print(">>> 特征数据标准化完成")
  384. else:
  385. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  386. print(">>> 预测模式下特征标准化处理完成")
  387. # 准备走归一化的特征
  388. # 事先定义好每个特征的合理范围
  389. fixed_ranges = {
  390. 'hours_until_departure': (0, 480), # 0-20天
  391. 'from_city_num': (0, 38),
  392. 'to_city_num': (0, 38),
  393. 'flight_1_num': (0, 341),
  394. 'flight_2_num': (0, 341),
  395. 'seats_remaining': (1, 5),
  396. 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次
  397. 'price_last_change_hours': (0, 480),
  398. 'days_to_departure': (0, 30),
  399. 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天
  400. 'flight_by_hour': (0, 23),
  401. 'flight_by_day': (1, 31),
  402. 'flight_day_of_month': (1, 12),
  403. 'flight_day_of_week': (0, 6),
  404. 'flight_day_of_quarter': (1, 4),
  405. }
  406. normal_features = list(fixed_ranges.keys())
  407. print(">>> 归一化特征列: ", normal_features)
  408. print(">>> 基于固定范围的特征数据归一化开始")
  409. for col in normal_features:
  410. if col in df.columns:
  411. # 核心归一化公式: (x - min) / (max - min)
  412. col_min, col_max = fixed_ranges[col]
  413. df[col] = (df[col] - col_min) / (col_max - col_min)
  414. # 添加裁剪,将超出范围的值强制限制在[0,1]区间
  415. df[col] = df[col].clip(0, 1)
  416. print(">>> 基于固定范围的特征数据归一化完成")
  417. return df, feature_scaler, target_scaler