data_preprocess.py 15 KB


  1. import pandas as pd
  2. import numpy as np
  3. import bisect
  4. import gc
  5. from datetime import datetime, timedelta
  6. from sklearn.preprocessing import StandardScaler
  7. from config import city_to_country, vj_city_code_map, build_country_holidays
  8. COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
  9. def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=28):
  10. print(">>> 开始数据预处理")
  11. # 生成 城市对
  12. df_input['city_pair'] = (
  13. df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
  14. )
  15. df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
  16. df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
  17. # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
  18. cols = df_input.columns.tolist()
  19. # 删除已存在的几列(保证顺序正确)
  20. for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
  21. cols.remove(c)
  22. # 这几列插入到最前面
  23. df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
  24. # 转格式
  25. df_input['search_dep_time'] = pd.to_datetime(
  26. df_input['search_dep_time'],
  27. format='%Y%m%d',
  28. errors='coerce'
  29. ).dt.strftime('%Y-%m-%d')
  30. # 重命名起飞日期
  31. df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
  32. # 重命名航班号
  33. df_input.rename(
  34. columns={
  35. 'seg1_flight_number': 'flight_number_1',
  36. 'seg2_flight_number': 'flight_number_2'
  37. },
  38. inplace=True
  39. )
  40. # 分开填充
  41. df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
  42. df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
  43. # gid:基于指定字段的分组标记(整数)
  44. df_input['gid'] = (
  45. df_input
  46. .groupby(
  47. ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
  48. sort=False
  49. )
  50. .ngroup()
  51. )
  52. # 做一下时间段裁剪, 保留起飞前480小时之内的
  53. df_input = df_input[df_input['hours_until_departure'] < 480].reset_index(drop=True)
  54. pass
  55. # 在 gid 与 baggage 内按时间降序
  56. df_input = df_input.sort_values(
  57. by=['gid', 'baggage', 'hours_until_departure'],
  58. ascending=[True, True, False]
  59. ).reset_index(drop=True)
  60. # 价格变化掩码
  61. g = df_input.groupby(['gid', 'baggage'])
  62. diff = g['adult_total_price'].transform('diff')
  63. change_mask = diff.abs() >= 5 # 变化太小的不计入
  64. # 价格变化次数
  65. df_input['price_change_times_total'] = (
  66. change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  67. )
  68. # 上次发生变价的小时数
  69. last_change_hour = (
  70. df_input['hours_until_departure']
  71. .where(change_mask)
  72. .groupby([df_input['gid'], df_input['baggage']])
  73. .ffill() # 前向填充
  74. )
  75. # 当前距离上一次变价过去多少小时
  76. df_input['price_last_change_hours'] = (
  77. last_change_hour - df_input['hours_until_departure']
  78. ).fillna(0)
  79. pass
  80. # 想插入到 seats_remaining 前面的新列
  81. new_cols = [
  82. 'price_change_times_total',
  83. 'price_last_change_hours'
  84. ]
  85. # 当前所有列
  86. cols = df_input.columns.tolist()
  87. # 找到 seats_remaining 的位置
  88. idx = cols.index('seats_remaining')
  89. # 重新拼列顺序
  90. new_order = cols[:idx] + new_cols + cols[idx:]
  91. # 去重(防止列已经在原位置)
  92. new_order = list(dict.fromkeys(new_order))
  93. # 重新排列 DataFrame
  94. df_input = df_input[new_order]
  95. pass
  96. # 生成第一机场对
  97. df_input['airport_pair_1'] = (
  98. df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
  99. )
  100. # 删除原始第一机场码
  101. df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
  102. # 第一机场对 放到 seg1_dep_time 列的前面
  103. insert_idx = df_input.columns.get_loc('seg1_dep_time')
  104. airport_pair_1 = df_input.pop('airport_pair_1')
  105. df_input.insert(insert_idx, 'airport_pair_1', airport_pair_1)
  106. # 生成第二机场对(带缺失兜底)
  107. df_input['airport_pair_2'] = np.where(
  108. df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
  109. 'NA',
  110. df_input['seg2_dep_air_port'].astype(str) + "-" +
  111. df_input['seg2_arr_air_port'].astype(str)
  112. )
  113. # 删除原始第二机场码
  114. df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
  115. # 第二机场对 放到 seg2_dep_time 列的前面
  116. insert_idx = df_input.columns.get_loc('seg2_dep_time')
  117. airport_pair_2 = df_input.pop('airport_pair_2')
  118. df_input.insert(insert_idx, 'airport_pair_2', airport_pair_2)
  119. # 是否转乘
  120. df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
  121. insert_idx = df_input.columns.get_loc('flight_number_2')
  122. is_transfer = df_input.pop('is_transfer')
  123. df_input.insert(insert_idx, 'is_transfer', is_transfer)
  124. # 重命名起飞时刻与到达时刻
  125. df_input.rename(
  126. columns={
  127. 'seg1_dep_time': 'dep_time_1',
  128. 'seg1_arr_time': 'arr_time_1',
  129. 'seg2_dep_time': 'dep_time_2',
  130. 'seg2_arr_time': 'arr_time_2',
  131. },
  132. inplace=True
  133. )
  134. # 第一段飞行时长
  135. df_input['fly_duration_1'] = (
  136. (df_input['arr_time_1'] - df_input['dep_time_1'])
  137. .dt.total_seconds() / 3600
  138. ).round(2)
  139. # 第二段飞行时长(无转乘为 0)
  140. df_input['fly_duration_2'] = (
  141. (df_input['arr_time_2'] - df_input['dep_time_2'])
  142. .dt.total_seconds() / 3600
  143. ).fillna(0).round(2)
  144. # 总飞行时长
  145. df_input['fly_duration'] = (
  146. df_input['fly_duration_1'] + df_input['fly_duration_2']
  147. ).round(2)
  148. # 中转停留时长(无转乘为 0)
  149. df_input['stop_duration'] = (
  150. (df_input['dep_time_2'] - df_input['arr_time_1'])
  151. .dt.total_seconds() / 3600
  152. ).fillna(0).round(2)
  153. # 裁剪,防止负数
  154. # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
  155. # df_input[c] = df_input[c].clip(lower=0)
  156. # 和 is_transfer 逻辑保持一致
  157. # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
  158. # 一次性插到 is_filled 前面
  159. insert_before = 'is_filled'
  160. new_cols = [
  161. 'fly_duration_1',
  162. 'fly_duration_2',
  163. 'fly_duration',
  164. 'stop_duration'
  165. ]
  166. cols = df_input.columns.tolist()
  167. idx = cols.index(insert_before)
  168. # 删除旧位置
  169. cols = [c for c in cols if c not in new_cols]
  170. # 插入新位置(顺序保持)
  171. cols[idx:idx] = new_cols # python独有空切片插入法
  172. df_input = df_input[cols]
  173. # 一次生成多个字段
  174. dep_t1 = df_input['dep_time_1']
  175. # 几点起飞(0–23)
  176. df_input['flight_by_hour'] = dep_t1.dt.hour
  177. # 起飞日期几号(1–31)
  178. df_input['flight_by_day'] = dep_t1.dt.day
  179. # 起飞日期几月(1–12)
  180. df_input['flight_day_of_month'] = dep_t1.dt.month
  181. # 起飞日期周几(0=周一, 6=周日)
  182. df_input['flight_day_of_week'] = dep_t1.dt.weekday
  183. # 起飞日期季度(1–4)
  184. df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
  185. # 是否周末(周六 / 周日)
  186. df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
  187. # 找到对应的国家码
  188. df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
  189. df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
  190. # 整体出发时间 就是 dep_time_1
  191. df_input['global_dep_time'] = df_input['dep_time_1']
  192. # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
  193. df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
  194. # 出发日期在出发国家是否节假日
  195. df_input['dep_country_is_holiday'] = df_input.apply(
  196. lambda r: r['global_dep_time'].date()
  197. in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
  198. axis=1
  199. ).astype(int)
  200. # 到达日期在到达国家是否节假日
  201. df_input['arr_country_is_holiday'] = df_input.apply(
  202. lambda r: r['global_arr_time'].date()
  203. in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
  204. axis=1
  205. ).astype(int)
  206. # 在任一侧是否节假日
  207. df_input['flight_day_is_holiday'] = (
  208. df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
  209. .max(axis=1)
  210. )
  211. # 是否跨国航线
  212. df_input['is_cross_country'] = (
  213. df_input['dep_country'] != df_input['arr_country']
  214. ).astype(int)
  215. def days_to_next_holiday(country, cur_date):
  216. if pd.isna(country) or pd.isna(cur_date):
  217. return np.nan
  218. holidays = COUNTRY_HOLIDAYS.get(country)
  219. if not holidays:
  220. return np.nan
  221. # 找未来(含当天)的节假日,并排序
  222. future_holidays = sorted([d for d in holidays if d >= cur_date])
  223. if not future_holidays:
  224. return np.nan
  225. next_holiday = future_holidays[0] # 第一个未来节假日
  226. delta_days = (next_holiday - cur_date).days
  227. return delta_days
  228. df_input['days_to_holiday'] = df_input.apply(
  229. lambda r: days_to_next_holiday(
  230. r['dep_country'],
  231. r['update_hour'].date()
  232. ),
  233. axis=1
  234. )
  235. # 没有未来节假日的统一兜底
  236. # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
  237. # days_to_holiday 插在 update_hour 前面
  238. insert_idx = df_input.columns.get_loc('update_hour')
  239. days_to_holiday = df_input.pop('days_to_holiday')
  240. df_input.insert(insert_idx, 'days_to_holiday', days_to_holiday)
  241. # 制作targets
  242. print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}")
  243. target_lower_limit = 4
  244. target_upper_limit = current_n_hours
  245. mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
  246. df_targets = df_input.loc[mask_targets].copy()
  247. targets_amout = df_targets.shape[0]
  248. print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
  249. if targets_amout == 0:
  250. print(f">>> n_hours = {current_n_hours} 无有效数据,跳过")
  251. return pd.DataFrame()
  252. print(">>> 计算 price_at_n_hours")
  253. df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy()
  254. df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前28小时
  255. # 提取并重命名 price 列
  256. df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
  257. print(">>> price_at_n_hours计算完成,示例:")
  258. print(df_last_price_at_n_hours.head(5))
  259. # 计算降价信息
  260. print(">>> 计算降价信息")
  261. df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
  262. df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
  263. df_targets['price_dropped'] = (
  264. (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
  265. (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
  266. )
  267. df_price_drops = df_targets[df_targets['price_dropped']].copy()
  268. price_drops_len = df_price_drops.shape[0]
  269. if price_drops_len == 0:
  270. print(f">>> n_hours = {current_n_hours} 无降价信息")
  271. # 创建包含指定列的空 DataFrame
  272. df_price_drop_info = pd.DataFrame({
  273. 'gid': pd.Series(dtype='int64'),
  274. 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
  275. 'price_at_first_drop_hours': pd.Series(dtype='float64')
  276. })
  277. else:
  278. df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
  279. df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
  280. 'hours_until_departure': 'first_drop_hours_until_departure',
  281. 'adult_total_price': 'price_at_first_drop_hours'
  282. })
  283. print(">>> 降价信息计算完成,示例:")
  284. print(df_price_drop_info.head(5))
  285. # 合并信息
  286. df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
  287. df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
  288. df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
  289. df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
  290. df_gid_info['time_to_price_drop'] = df_gid_info['first_drop_hours_until_departure']
  291. df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
  292. del df_input_object
  293. del df_last
  294. del df_last_price_at_n_hours
  295. del df_targets
  296. del df_price_drops
  297. del df_price_drop_info
  298. gc.collect()
  299. # 将目标变量合并到输入数据中
  300. print(">>> 将目标变量信息合并到 df_input")
  301. df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
  302. # 使用 0 填充 NaN 值
  303. df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
  304. ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
  305. df_input = df_input.rename(columns={
  306. 'will_price_drop': 'target_will_price_drop',
  307. 'amount_of_price_drop': 'target_amount_of_drop',
  308. 'time_to_price_drop': 'target_time_to_drop'
  309. })
  310. # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
  311. # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
  312. # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
  313. # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
  314. # gid_count = df_min_price_by_gid.shape[0]
  315. # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
  316. # # 将最小价格 merge 到 df_inputs 中
  317. # print(">>> 将最小价格 merge 到输入数据中...")
  318. # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
  319. print(">>> 合并后 df_input 样例:")
  320. print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
  321. return df_input