data_preprocess.py 84 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687
  1. import pandas as pd
  2. import numpy as np
  3. import bisect
  4. import gc
  5. import os
  6. from datetime import datetime, timedelta
  7. from sklearn.preprocessing import StandardScaler
  8. from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays
  9. from utils import insert_df_col
  10. COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
  11. def preprocess_data_cycle(df_input, interval_hours=8, feature_length=240, target_length=24, is_training=True):
  12. # df_input_part = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['hours_until_departure'] < current_n_hours)].copy()
  13. df_input = preprocess_data_first_half(df_input)
  14. # 创建一个空列表来存储所有处理后的数据部分
  15. list_df_parts = []
  16. crop_lower_limit_list = [4] # [4, 28, 52, 76, 100]
  17. for crop_lower_limit in crop_lower_limit_list:
  18. target_n_hours = crop_lower_limit + target_length
  19. feature_n_hours = target_n_hours + interval_hours
  20. crop_upper_limit = feature_n_hours + feature_length
  21. df_input_part = preprocess_data(df_input, is_training=is_training, crop_upper_limit=crop_upper_limit, feature_n_hours=feature_n_hours,
  22. target_n_hours=target_n_hours, crop_lower_limit=crop_lower_limit)
  23. # 将处理后的部分添加到列表中
  24. list_df_parts.append(df_input_part)
  25. if not is_training:
  26. break
  27. # 合并所有处理后的数据部分
  28. if list_df_parts:
  29. df_combined = pd.concat(list_df_parts, ignore_index=True)
  30. return df_combined
  31. else:
  32. return pd.DataFrame() # 如果没有数据,返回空DataFrame
  33. def preprocess_data_first_half(df_input):
  34. '''前半部分'''
  35. print(">>> 开始数据预处理")
  36. # 生成 城市对
  37. df_input['city_pair'] = (
  38. df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
  39. )
  40. # 城市码映射成数字
  41. df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
  42. df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
  43. missing_from = (
  44. df_input.loc[df_input['from_city_num'].isna(), 'from_city_code']
  45. .unique()
  46. )
  47. missing_to = (
  48. df_input.loc[df_input['to_city_num'].isna(), 'to_city_code']
  49. .unique()
  50. )
  51. if missing_from:
  52. print("未映射的 from_city:", missing_from)
  53. if missing_to:
  54. print("未映射的 to_city:", missing_to)
  55. # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
  56. cols = df_input.columns.tolist()
  57. # 删除已存在的几列(保证顺序正确)
  58. for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
  59. cols.remove(c)
  60. # 这几列插入到最前面
  61. df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
  62. pass
  63. # 转格式
  64. df_input['search_dep_time'] = pd.to_datetime(
  65. df_input['search_dep_time'],
  66. format='%Y%m%d',
  67. errors='coerce'
  68. ).dt.strftime('%Y-%m-%d')
  69. # 重命名起飞日期
  70. df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
  71. # 重命名航班号
  72. df_input.rename(
  73. columns={
  74. 'seg1_flight_number': 'flight_number_1',
  75. 'seg2_flight_number': 'flight_number_2'
  76. },
  77. inplace=True
  78. )
  79. # 分开填充
  80. df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
  81. df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
  82. # 航班号转数字
  83. df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map)
  84. df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map)
  85. missing_flight_1 = (
  86. df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1']
  87. .unique()
  88. )
  89. missing_flight_2 = (
  90. df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2']
  91. .unique()
  92. )
  93. if missing_flight_1:
  94. print("未映射的 flight_1:", missing_flight_1)
  95. if missing_flight_2:
  96. print("未映射的 flight_2:", missing_flight_2)
  97. # flight_1_num 放在 seg1_dep_air_port 之前
  98. insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port')
  99. # flight_2_num 放在 seg2_dep_air_port 之前
  100. insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port')
  101. df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0
  102. # baggage_level 放在 flight_number_2 之前
  103. insert_df_col(df_input, 'baggage_level', 'flight_number_2')
  104. df_input['Adult_Total_Price'] = df_input['adult_total_price']
  105. # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值
  106. insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining')
  107. df_input['Hours_Until_Departure'] = df_input['hours_until_departure']
  108. # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值
  109. insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure')
  110. pass
  111. # gid:基于指定字段的分组标记(整数)
  112. df_input['gid'] = (
  113. df_input
  114. .groupby(
  115. ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
  116. sort=False
  117. )
  118. .ngroup()
  119. )
  120. return df_input
  121. def preprocess_data(df_input, is_training=True, crop_upper_limit=480, feature_n_hours=36, target_n_hours=28, crop_lower_limit=4):
  122. print(f"裁剪范围: [{crop_lower_limit}, {crop_upper_limit}], 间隔窗口: [{target_n_hours}, {feature_n_hours}]")
  123. # 做一下时间段裁剪, 保留起飞前480小时之内且大于等于4小时的
  124. df_input = df_input[(df_input['hours_until_departure'] < crop_upper_limit) &
  125. (df_input['hours_until_departure'] >= crop_lower_limit)].reset_index(drop=True)
  126. # 在 gid 与 baggage 内按时间降序
  127. df_input = df_input.sort_values(
  128. by=['gid', 'baggage', 'hours_until_departure'],
  129. ascending=[True, True, False]
  130. ).reset_index(drop=True)
  131. # 价格幅度阈值
  132. VALID_DROP_MIN = 5
  133. # 价格变化掩码
  134. g = df_input.groupby(['gid', 'baggage'])
  135. diff = g['adult_total_price'].transform('diff')
  136. # change_mask = diff.abs() >= VALID_DROP_MIN # 变化太小的不计入
  137. decrease_mask = diff <= -VALID_DROP_MIN # 降价(变化太小的不计入)
  138. increase_mask = diff >= VALID_DROP_MIN # 升价(变化太小的不计入)
  139. df_input['_price_event_dir'] = np.where(increase_mask, 1, np.where(decrease_mask, -1, 0))
  140. # 计算连续升价/降价次数
  141. def _calc_price_streaks(df_group):
  142. dirs = df_group['_price_event_dir'].to_numpy()
  143. n = len(dirs)
  144. inc = np.full(n, np.nan)
  145. dec = np.full(n, np.nan)
  146. last_dir = 0
  147. inc_cnt = 0
  148. dec_cnt = 0
  149. for i, d in enumerate(dirs):
  150. if d == 1:
  151. inc_cnt = inc_cnt + 1 if last_dir == 1 else 1
  152. dec_cnt = 0
  153. last_dir = 1
  154. inc[i] = inc_cnt
  155. dec[i] = dec_cnt
  156. elif d == -1:
  157. dec_cnt = dec_cnt + 1 if last_dir == -1 else 1
  158. inc_cnt = 0
  159. last_dir = -1
  160. inc[i] = inc_cnt
  161. dec[i] = dec_cnt
  162. inc_s = pd.Series(inc, index=df_group.index).ffill().fillna(0).astype(int)
  163. dec_s = pd.Series(dec, index=df_group.index).ffill().fillna(0).astype(int)
  164. return pd.DataFrame(
  165. {
  166. 'price_increase_times_consecutive': inc_s,
  167. 'price_decrease_times_consecutive': dec_s,
  168. },
  169. index=df_group.index,
  170. )
  171. streak_df = df_input.groupby(['gid', 'baggage'], sort=False, group_keys=False).apply(_calc_price_streaks)
  172. df_input = df_input.join(streak_df)
  173. df_input.drop(columns=['_price_event_dir'], inplace=True)
  174. # 价格变化次数
  175. # df_input['price_change_times_total'] = (
  176. # change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  177. # )
  178. # 价格下降次数
  179. df_input['price_decrease_times_total'] = (
  180. decrease_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  181. )
  182. # 价格上升次数
  183. df_input['price_increase_times_total'] = (
  184. increase_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
  185. )
  186. # 上次发生变价的小时数
  187. # last_change_hour = (
  188. # df_input['hours_until_departure']
  189. # .where(change_mask)
  190. # .groupby([df_input['gid'], df_input['baggage']])
  191. # .ffill() # 前向填充
  192. # )
  193. # 上次发生降价的小时数
  194. last_decrease_hour = (
  195. df_input['hours_until_departure']
  196. .where(decrease_mask)
  197. .groupby([df_input['gid'], df_input['baggage']])
  198. .ffill() # 前向填充
  199. )
  200. # 上次发生升价的小时数
  201. last_increase_hour = (
  202. df_input['hours_until_departure']
  203. .where(increase_mask)
  204. .groupby([df_input['gid'], df_input['baggage']])
  205. .ffill() # 前向填充
  206. )
  207. # 当前距离上一次变价过去多少小时
  208. # df_input['price_last_change_hours'] = (
  209. # last_change_hour - df_input['hours_until_departure']
  210. # ).fillna(0)
  211. # 当前距离上一次降价过去多少小时
  212. df_input['price_last_decrease_hours'] = (
  213. last_decrease_hour - df_input['hours_until_departure']
  214. ).fillna(0)
  215. # 当前距离上一次升价过去多少小时
  216. df_input['price_last_increase_hours'] = (
  217. last_increase_hour - df_input['hours_until_departure']
  218. ).fillna(0)
  219. pass
  220. # 想插入到 seats_remaining 前面的新列
  221. new_cols = [
  222. # 'price_change_times_total',
  223. # 'price_last_change_hours',
  224. 'price_decrease_times_total',
  225. 'price_decrease_times_consecutive',
  226. 'price_last_decrease_hours',
  227. 'price_increase_times_total',
  228. 'price_increase_times_consecutive',
  229. 'price_last_increase_hours',
  230. ]
  231. # 当前所有列
  232. cols = df_input.columns.tolist()
  233. # 找到 seats_remaining 的位置
  234. idx = cols.index('seats_remaining')
  235. # 重新拼列顺序
  236. new_order = cols[:idx] + new_cols + cols[idx:]
  237. # 去重(防止列已经在原位置)
  238. new_order = list(dict.fromkeys(new_order))
  239. # 重新排列 DataFrame
  240. df_input = df_input[new_order]
  241. pass
  242. print(">>> 计算价格区间特征")
  243. # 1. 基于绝对价格水平的价格区间划分
  244. # 先计算每个(gid, baggage)的价格统计特征
  245. # g = df_input.groupby(['gid', 'baggage'])
  246. price_stats = df_input.groupby(['gid', 'baggage'])['adult_total_price'].agg(
  247. min_price='min',
  248. max_price='max',
  249. mean_price='mean',
  250. std_price='std'
  251. ).reset_index()
  252. # 合并统计特征到原数据
  253. df_input = df_input.merge(price_stats, on=['gid', 'baggage'], how='left')
  254. # 2. 基于绝对价格的价格区间划分 (可以删除,因为后面有更精细的基于频率加权的分类)
  255. # # 高价区间:超过均值+1倍标准差
  256. # df_input['price_absolute_high'] = (df_input['adult_total_price'] >
  257. # (df_input['mean_price'] + df_input['std_price'])).astype(int)
  258. # # 中高价区间:均值到均值+1倍标准差
  259. # df_input['price_absolute_mid_high'] = ((df_input['adult_total_price'] > df_input['mean_price']) &
  260. # (df_input['adult_total_price'] <= (df_input['mean_price'] + df_input['std_price']))).astype(int)
  261. # # 中低价区间:均值-1倍标准差到均值
  262. # df_input['price_absolute_mid_low'] = ((df_input['adult_total_price'] > (df_input['mean_price'] - df_input['std_price'])) &
  263. # (df_input['adult_total_price'] <= df_input['mean_price'])).astype(int)
  264. # # 低价区间:低于均值-1倍标准差
  265. # df_input['price_absolute_low'] = (df_input['adult_total_price'] <= (df_input['mean_price'] - df_input['std_price'])).astype(int)
  266. # 3. 基于频率加权的价格百分位数(改进版)
  267. # 计算每个价格出现的频率
  268. price_freq = df_input.groupby(['gid', 'baggage', 'adult_total_price']).size().reset_index(name='price_frequency')
  269. df_input = df_input.merge(price_freq, on=['gid', 'baggage', 'adult_total_price'], how='left')
  270. # 计算频率加权的百分位数
  271. def weighted_percentile(group):
  272. if len(group) == 0:
  273. return pd.Series([np.nan] * 4, index=['price_weighted_percentile_25',
  274. 'price_weighted_percentile_50',
  275. 'price_weighted_percentile_75',
  276. 'price_weighted_percentile_90'])
  277. # 按价格排序,计算累积频率
  278. group = group.sort_values('adult_total_price')
  279. group['cum_freq'] = group['price_frequency'].cumsum()
  280. total_freq = group['price_frequency'].sum()
  281. # 计算加权百分位数
  282. percentiles = []
  283. for p in [0.25, 0.5, 0.75, 0.9]:
  284. threshold = total_freq * p
  285. # 找到第一个累积频率超过阈值的价格
  286. mask = group['cum_freq'] >= threshold
  287. if mask.any():
  288. percentile_value = group.loc[mask.idxmax(), 'adult_total_price']
  289. else:
  290. percentile_value = group['adult_total_price'].max()
  291. percentiles.append(percentile_value)
  292. return pd.Series(percentiles, index=['price_weighted_percentile_25',
  293. 'price_weighted_percentile_50',
  294. 'price_weighted_percentile_75',
  295. 'price_weighted_percentile_90'])
  296. # 按gid和baggage分组计算加权百分位数
  297. weighted_percentiles = df_input.groupby(['gid', 'baggage']).apply(weighted_percentile).reset_index()
  298. df_input = df_input.merge(weighted_percentiles, on=['gid', 'baggage'], how='left')
  299. # 4. 结合绝对价格和频率的综合判断(改进版)
  300. freq_median = df_input.groupby(['gid', 'baggage'])['price_frequency'].transform('median')
  301. # 计算价格相对于90%百分位数的倍数,用于区分不同级别的高价
  302. df_input['price_relative_to_90p'] = df_input['adult_total_price'] / df_input['price_weighted_percentile_90']
  303. # 添加价格容忍度:避免相近价格被分到不同区间
  304. # 计算价格差异容忍度(使用各百分位数的1%作为容忍度阈值)
  305. # tolerance_90p = df_input['price_weighted_percentile_90'] * 0.01
  306. tolerance_75p = df_input['price_weighted_percentile_75'] * 0.01
  307. tolerance_50p = df_input['price_weighted_percentile_50'] * 0.01
  308. tolerance_25p = df_input['price_weighted_percentile_25'] * 0.01
  309. # 重新设计价格区间分类(确保无重叠):
  310. # 首先定义各个区间的mask
  311. # 4.1 异常高价:价格远高于90%百分位数(超过1.5倍)且频率极低(低于中位数的1/3)
  312. price_abnormal_high_mask = ((df_input['price_relative_to_90p'] > 1.5) &
  313. (df_input['price_frequency'] < freq_median * 0.33))
  314. # 4.2 真正高位:严格满足条件(价格 > 90%分位数 且 频率 < 中位数)
  315. price_real_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_90']) &
  316. (df_input['price_frequency'] < freq_median) &
  317. ~price_abnormal_high_mask)
  318. # 4.3 正常高位:使用容忍度(价格接近75%分位数)
  319. price_normal_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_75'] - tolerance_75p) &
  320. ~price_real_high_mask & ~price_abnormal_high_mask)
  321. # 4.4 中高价:使用容忍度(价格在50%-75%分位数之间)
  322. price_mid_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_50'] - tolerance_50p) &
  323. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_75'] + tolerance_75p) &
  324. ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  325. # 4.5 中低价:使用容忍度(价格在25%-50%分位数之间)
  326. price_mid_low_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_25'] - tolerance_25p) &
  327. (df_input['adult_total_price'] <= df_input['price_weighted_percentile_50'] + tolerance_50p) &
  328. ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  329. # 4.6 低价:严格满足条件(价格 ≤ 25%分位数)
  330. price_low_mask = ((df_input['adult_total_price'] <= df_input['price_weighted_percentile_25']) &
  331. ~price_mid_low_mask & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
  332. # 使用np.select确保互斥性
  333. price_zone_masks = [
  334. price_abnormal_high_mask, # 异常高价区(5级)
  335. price_real_high_mask, # 真正高价区(4级)
  336. price_normal_high_mask, # 正常高价区(3级)
  337. price_mid_high_mask, # 中高价区(2级)
  338. price_mid_low_mask, # 中低价区(1级)
  339. price_low_mask, # 低价区(0级)
  340. ]
  341. price_zone_values = [5, 4, 3, 2, 1, 0] # 5:异常高价, 4:真正高价, 3:正常高价, 2:中高价, 1:中低价, 0:低价
  342. # 使用np.select确保每个价格只被分到一个区间
  343. price_zone_result = np.select(price_zone_masks, price_zone_values, default=2) # 默认中高价
  344. # 4.8 价格区间综合标记
  345. df_input['price_zone_comprehensive'] = price_zone_result
  346. # 5. 价格异常度检测
  347. # 价格相对于均值的标准化偏差
  348. df_input['price_z_score'] = (df_input['adult_total_price'] - df_input['mean_price']) / df_input['std_price']
  349. # 价格异常度:基于Z-score的绝对值
  350. df_input['price_anomaly_score'] = np.abs(df_input['price_z_score'])
  351. # 6. 价格稳定性特征
  352. # 计算价格波动系数(标准差/均值)
  353. df_input['price_coefficient_variation'] = df_input['std_price'] / df_input['mean_price']
  354. # 7. 价格趋势特征
  355. # 计算当前价格相对于历史价格的位置
  356. df_input['price_relative_position'] = (df_input['adult_total_price'] - df_input['min_price']) / (df_input['max_price'] - df_input['min_price'])
  357. df_input['price_relative_position'] = df_input['price_relative_position'].fillna(0.5) # 兜底
  358. # 删除中间计算列
  359. df_input.drop(columns=['price_frequency', 'price_z_score', 'price_relative_to_90p'], inplace=True, errors='ignore')
  360. del price_freq
  361. del price_stats
  362. del weighted_percentiles
  363. del freq_median
  364. print(">>> 改进版价格区间特征计算完成")
  365. # 生成第一机场对
  366. df_input['airport_pair_1'] = (
  367. df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
  368. )
  369. # 删除原始第一机场码
  370. df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
  371. # 第一机场对 放到 seg1_dep_time 列的前面
  372. insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time')
  373. # 生成第二机场对(带缺失兜底)
  374. df_input['airport_pair_2'] = np.where(
  375. df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
  376. 'NA',
  377. df_input['seg2_dep_air_port'].astype(str) + "-" +
  378. df_input['seg2_arr_air_port'].astype(str)
  379. )
  380. # 删除原始第二机场码
  381. df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
  382. # 第二机场对 放到 seg2_dep_time 列的前面
  383. insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time')
  384. # 是否转乘
  385. df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
  386. # 是否转乘 放到 flight_number_2 列的前面
  387. insert_df_col(df_input, 'is_transfer', 'flight_number_2')
  388. # 重命名起飞时刻与到达时刻
  389. df_input.rename(
  390. columns={
  391. 'seg1_dep_time': 'dep_time_1',
  392. 'seg1_arr_time': 'arr_time_1',
  393. 'seg2_dep_time': 'dep_time_2',
  394. 'seg2_arr_time': 'arr_time_2',
  395. },
  396. inplace=True
  397. )
  398. # 第一段飞行时长
  399. df_input['fly_duration_1'] = (
  400. (df_input['arr_time_1'] - df_input['dep_time_1'])
  401. .dt.total_seconds() / 3600
  402. ).round(2)
  403. # 第二段飞行时长(无转乘为 0)
  404. df_input['fly_duration_2'] = (
  405. (df_input['arr_time_2'] - df_input['dep_time_2'])
  406. .dt.total_seconds() / 3600
  407. ).fillna(0).round(2)
  408. # 总飞行时长
  409. df_input['fly_duration'] = (
  410. df_input['fly_duration_1'] + df_input['fly_duration_2']
  411. ).round(2)
  412. # 中转停留时长(无转乘为 0)
  413. df_input['stop_duration'] = (
  414. (df_input['dep_time_2'] - df_input['arr_time_1'])
  415. .dt.total_seconds() / 3600
  416. ).fillna(0).round(2)
  417. # 裁剪,防止负数
  418. # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
  419. # df_input[c] = df_input[c].clip(lower=0)
  420. # 和 is_transfer 逻辑保持一致
  421. # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
  422. # 一次性插到 is_filled 前面
  423. insert_before = 'is_filled'
  424. new_cols = [
  425. 'fly_duration_1',
  426. 'fly_duration_2',
  427. 'fly_duration',
  428. 'stop_duration'
  429. ]
  430. cols = df_input.columns.tolist()
  431. idx = cols.index(insert_before)
  432. # 删除旧位置
  433. cols = [c for c in cols if c not in new_cols]
  434. # 插入新位置(顺序保持)
  435. cols[idx:idx] = new_cols # python独有空切片插入法
  436. df_input = df_input[cols]
  437. # 一次生成多个字段
  438. dep_t1 = df_input['dep_time_1']
  439. # 几点起飞(0–23)
  440. df_input['flight_by_hour'] = dep_t1.dt.hour
  441. # 起飞日期几号(1–31)
  442. df_input['flight_by_day'] = dep_t1.dt.day
  443. # 起飞日期几月(1–12)
  444. df_input['flight_day_of_month'] = dep_t1.dt.month
  445. # 起飞日期周几(0=周一, 6=周日)
  446. df_input['flight_day_of_week'] = dep_t1.dt.weekday
  447. # 起飞日期季度(1–4)
  448. df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
  449. # 是否周末(周六 / 周日)
  450. df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
  451. # 找到对应的国家码
  452. df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
  453. df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
  454. # 整体出发时间 就是 dep_time_1
  455. df_input['global_dep_time'] = df_input['dep_time_1']
  456. # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
  457. df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
  458. # 出发日期在出发国家是否节假日
  459. df_input['dep_country_is_holiday'] = df_input.apply(
  460. lambda r: r['global_dep_time'].date()
  461. in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
  462. axis=1
  463. ).astype(int)
  464. # 到达日期在到达国家是否节假日
  465. df_input['arr_country_is_holiday'] = df_input.apply(
  466. lambda r: r['global_arr_time'].date()
  467. in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
  468. axis=1
  469. ).astype(int)
  470. # 在任一侧是否节假日
  471. df_input['any_country_is_holiday'] = (
  472. df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
  473. .max(axis=1)
  474. )
  475. # 是否跨国航线
  476. df_input['is_cross_country'] = (
  477. df_input['dep_country'] != df_input['arr_country']
  478. ).astype(int)
  479. def days_to_next_holiday(country, cur_date):
  480. if pd.isna(country) or pd.isna(cur_date):
  481. return np.nan
  482. holidays = COUNTRY_HOLIDAYS.get(country)
  483. if not holidays:
  484. return np.nan
  485. # 找未来(含当天)的节假日,并排序
  486. future_holidays = sorted([d for d in holidays if d >= cur_date])
  487. if not future_holidays:
  488. return np.nan
  489. next_holiday = future_holidays[0] # 第一个未来节假日
  490. delta_days = (next_holiday - cur_date).days
  491. return delta_days
  492. df_input['days_to_holiday'] = df_input.apply(
  493. lambda r: days_to_next_holiday(
  494. r['dep_country'],
  495. r['update_hour'].date()
  496. ),
  497. axis=1
  498. )
  499. # 没有未来节假日的统一兜底
  500. # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
  501. # days_to_holiday 插在 update_hour 前面
  502. insert_df_col(df_input, 'days_to_holiday', 'update_hour')
  503. # 训练模式
  504. if is_training:
  505. print(">>> 训练模式:计算 target 相关列")
  506. print(f"\n>>> 开始处理 对应区间: n_hours = {target_n_hours}")
  507. target_lower_limit = crop_lower_limit
  508. target_upper_limit = target_n_hours
  509. mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
  510. df_targets = df_input.loc[mask_targets].copy()
  511. targets_amout = df_targets.shape[0]
  512. print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
  513. if targets_amout == 0:
  514. print(f">>> n_hours = {target_n_hours} 无有效数据,跳过")
  515. return pd.DataFrame()
  516. print(">>> 计算 price_at_n_hours")
  517. df_input_object = df_input[(df_input['hours_until_departure'] >= feature_n_hours) & (df_input['baggage'] == 30)].copy()
  518. df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前36\32\30小时
  519. # 提取并重命名 price 列
  520. df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
  521. print(">>> price_at_n_hours计算完成,示例:")
  522. print(df_last_price_at_n_hours.head(5))
  523. # 新的计算降价方式
  524. # 先排序
  525. df_targets = df_targets.sort_values(
  526. ['gid', 'hours_until_departure'],
  527. ascending=[True, False]
  528. )
  529. # 在 gid 内计算价格变化
  530. g = df_targets.groupby('gid', group_keys=False)
  531. df_targets['price_diff'] = g['adult_total_price'].diff()
  532. # VALID_DROP_MIN = 5
  533. # LOWER_HOUR = 4
  534. # UPPER_HOUR = 28
  535. valid_drop_mask = (
  536. (df_targets['price_diff'] <= -VALID_DROP_MIN)
  537. # (df_targets['hours_until_departure'] >= LOWER_HOUR) &
  538. # (df_targets['hours_until_departure'] <= UPPER_HOUR)
  539. )
  540. # 有效的降价
  541. df_valid_drops = df_targets.loc[valid_drop_mask]
  542. # 找「第一次」降价(每个 gid)
  543. df_first_price_drop = (
  544. df_valid_drops
  545. .groupby('gid', as_index=False)
  546. .first()
  547. )
  548. # 简化列
  549. df_first_price_drop = df_first_price_drop[
  550. ['gid', 'hours_until_departure', 'adult_total_price', 'price_diff']
  551. ].rename(columns={
  552. 'hours_until_departure': 'time_to_price_drop',
  553. 'adult_total_price': 'price_at_d_hours',
  554. 'price_diff': 'amount_of_price_drop',
  555. })
  556. # 把降价幅度转成正数(更直观)
  557. df_first_price_drop['amount_of_price_drop'] = (-df_first_price_drop['amount_of_price_drop']).round(2)
  558. pass
  559. # # 计算降价信息
  560. # print(">>> 计算降价信息")
  561. # df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
  562. # df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
  563. # df_targets['price_dropped'] = (
  564. # (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
  565. # (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
  566. # )
  567. # df_price_drops = df_targets[df_targets['price_dropped']].copy()
  568. # price_drops_len = df_price_drops.shape[0]
  569. # if price_drops_len == 0:
  570. # print(f">>> n_hours = {current_n_hours} 无降价信息")
  571. # # 创建包含指定列的空 DataFrame
  572. # df_price_drop_info = pd.DataFrame({
  573. # 'gid': pd.Series(dtype='int64'),
  574. # 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
  575. # 'price_at_first_drop_hours': pd.Series(dtype='float64')
  576. # })
  577. # else:
  578. # df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
  579. # df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
  580. # 'hours_until_departure': 'first_drop_hours_until_departure',
  581. # 'adult_total_price': 'price_at_first_drop_hours'
  582. # })
  583. # print(">>> 降价信息计算完成,示例:")
  584. # print(df_price_drop_info.head(5))
  585. # # 合并信息
  586. # df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
  587. # df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
  588. # df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
  589. # df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
  590. # df_gid_info['time_to_price_drop'] = current_n_hours - df_gid_info['first_drop_hours_until_departure']
  591. # df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
  592. # del df_input_object
  593. # del df_last
  594. # del df_last_price_at_n_hours
  595. # del df_price_drops
  596. # del df_price_drop_info
  597. df_gid_info = df_last_price_at_n_hours.merge(df_first_price_drop, on='gid', how='left')
  598. df_gid_info['will_price_drop'] = df_gid_info['time_to_price_drop'].notnull().astype(int)
  599. df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0)
  600. df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0)
  601. pass
  602. del df_input_object
  603. del df_last
  604. del df_last_price_at_n_hours
  605. del df_first_price_drop
  606. del df_valid_drops
  607. del df_targets
  608. gc.collect()
  609. # 将目标变量合并到输入数据中
  610. print(">>> 将目标变量信息合并到 df_input")
  611. df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
  612. # 使用 0 填充 NaN 值
  613. df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
  614. ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
  615. df_input = df_input.rename(columns={
  616. 'will_price_drop': 'target_will_price_drop',
  617. 'amount_of_price_drop': 'target_amount_of_drop',
  618. 'time_to_price_drop': 'target_time_to_drop'
  619. })
  620. # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
  621. # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
  622. # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
  623. # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
  624. # gid_count = df_min_price_by_gid.shape[0]
  625. # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
  626. # # 将最小价格 merge 到 df_inputs 中
  627. # print(">>> 将最小价格 merge 到输入数据中...")
  628. # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
  629. print(">>> 合并后 df_input 样例:")
  630. print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
  631. # 预测模式
  632. else:
  633. print(">>> 预测模式:补齐 target 相关列(全部置 0)")
  634. df_input['target_will_price_drop'] = 0
  635. df_input['target_amount_of_drop'] = 0.0
  636. df_input['target_time_to_drop'] = 0
  637. # 按顺序排列
  638. order_columns = [
  639. "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day",
  640. "seats_remaining", "baggage", "baggage_level",
  641. "price_decrease_times_total", "price_decrease_times_consecutive", "price_last_decrease_hours",
  642. "price_increase_times_total", "price_increase_times_consecutive", "price_last_increase_hours",
  643. "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_amount_of_drop", "target_time_to_drop",
  644. "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "crawl_date", "gid",
  645. "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1",
  646. "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer",
  647. "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration",
  648. "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country",
  649. "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday",
  650. "price_weighted_percentile_25", "price_weighted_percentile_50", "price_weighted_percentile_75", "price_weighted_percentile_90",
  651. "price_zone_comprehensive", "price_relative_position",
  652. ]
  653. df_input = df_input[order_columns]
  654. return df_input
  655. def standardization(df, feature_scaler, target_scaler=None, is_training=True, is_val=False, feature_length=240):
  656. print(">>> 开始标准化处理")
  657. # 准备走标准化的特征
  658. scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration',
  659. 'price_weighted_percentile_25', 'price_weighted_percentile_50',
  660. 'price_weighted_percentile_75', 'price_weighted_percentile_90']
  661. if is_training:
  662. print(">>> 特征数据标准化开始")
  663. if feature_scaler is None:
  664. feature_scaler = StandardScaler()
  665. if not is_val:
  666. feature_scaler.fit(df[scaler_features])
  667. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  668. print(">>> 特征数据标准化完成")
  669. else:
  670. df[scaler_features] = feature_scaler.transform(df[scaler_features])
  671. print(">>> 预测模式下特征标准化处理完成")
  672. # 准备走归一化的特征
  673. # 事先定义好每个特征的合理范围
  674. fixed_ranges = {
  675. 'hours_until_departure': (0, 480), # 0-20天
  676. 'from_city_num': (0, 38),
  677. 'to_city_num': (0, 38),
  678. 'flight_1_num': (0, 341),
  679. 'flight_2_num': (0, 341),
  680. 'seats_remaining': (1, 5),
  681. # 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次
  682. # 'price_last_change_hours': (0, 480),
  683. 'price_decrease_times_total': (0, 20), # 假设价格下降次数不会超过20次
  684. 'price_decrease_times_consecutive': (0, 10), # 假设价格连续下降次数不会超过10次
  685. 'price_last_decrease_hours': (0, feature_length), #(0-240小时)
  686. 'price_increase_times_total': (0, 20), # 假设价格上升次数不会超过20次
  687. 'price_increase_times_consecutive': (0, 10), # 假设价格连续上升次数不会超过10次
  688. 'price_last_increase_hours': (0, feature_length), #(0-240小时)
  689. 'price_zone_comprehensive': (0, 5),
  690. 'days_to_departure': (0, 30),
  691. 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天
  692. 'flight_by_hour': (0, 23),
  693. 'flight_by_day': (1, 31),
  694. 'flight_day_of_month': (1, 12),
  695. 'flight_day_of_week': (0, 6),
  696. 'flight_day_of_quarter': (1, 4),
  697. }
  698. normal_features = list(fixed_ranges.keys())
  699. print(">>> 归一化特征列: ", normal_features)
  700. print(">>> 基于固定范围的特征数据归一化开始")
  701. for col in normal_features:
  702. if col in df.columns:
  703. # 核心归一化公式: (x - min) / (max - min)
  704. col_min, col_max = fixed_ranges[col]
  705. df[col] = (df[col] - col_min) / (col_max - col_min)
  706. # 添加裁剪,将超出范围的值强制限制在[0,1]区间
  707. df[col] = df[col].clip(0, 1)
  708. print(">>> 基于固定范围的特征数据归一化完成")
  709. return df, feature_scaler, target_scaler
  710. def preprocess_data_simple(df_input, is_train=False):
  711. df_input = preprocess_data_first_half(df_input)
  712. # 在 gid 与 baggage 内按时间降序
  713. df_input = df_input.sort_values(
  714. by=['gid', 'baggage', 'hours_until_departure'],
  715. ascending=[True, True, False]
  716. ).reset_index(drop=True)
  717. df_input = df_input[df_input['hours_until_departure'] <= 480]
  718. df_input = df_input[df_input['baggage'] == 0] # 只保留无行李的
  719. # 在hours_until_departure 的末尾 保留真实的而不是补齐的数据
  720. if not is_train:
  721. _tail_filled = df_input.groupby(['gid', 'baggage'])['is_filled'].transform(
  722. lambda s: s.iloc[::-1].cummin().iloc[::-1]
  723. )
  724. df_input = df_input[~((df_input['is_filled'] == 1) & (_tail_filled == 1))]
  725. # 价格变化最小量阈值
  726. price_change_amount_threshold = 5
  727. df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'].diff()
  728. # 计算价格变化量
  729. # df_input['price_change_amount'] = (
  730. # df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
  731. # .apply(lambda s: s.diff().replace(0, np.nan).ffill().fillna(0)).round(2)
  732. # )
  733. df_input['price_change_amount'] = (
  734. df_input['_raw_price_diff']
  735. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  736. .replace(0, np.nan)
  737. .groupby([df_input['gid'], df_input['baggage']], group_keys=False)
  738. .ffill()
  739. .fillna(0)
  740. .round(2)
  741. )
  742. # 计算价格变化百分比(相对于上一时间点的变化率)
  743. # df_input['price_change_percent'] = (
  744. # df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
  745. # .apply(lambda s: s.pct_change().replace(0, np.nan).ffill().fillna(0)).round(4)
  746. # )
  747. df_input['price_change_percent'] = (
  748. df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
  749. .pct_change()
  750. .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
  751. .replace(0, np.nan)
  752. .groupby([df_input['gid'], df_input['baggage']], group_keys=False)
  753. .ffill()
  754. .fillna(0)
  755. .round(4)
  756. )
  757. # 第一步:标记价格变化段
  758. df_input['price_change_segment'] = (
  759. df_input.groupby(['gid', 'baggage'], group_keys=False)['price_change_amount']
  760. .apply(lambda s: (s != s.shift()).cumsum())
  761. )
  762. # 第二步:计算每个变化段内的持续时间
  763. df_input['price_duration_hours'] = (
  764. df_input.groupby(['gid', 'baggage', 'price_change_segment'], group_keys=False)
  765. .cumcount()
  766. .add(1)
  767. )
  768. # 可选:删除临时列
  769. # df_input = df_input.drop(columns=['price_change_segment'])
  770. df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff'])
  771. # 仅在价格变化点记录余票变化量;其它非价格变化点置空(NaN)
  772. # _price_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'].diff()
  773. # _price_changed = _price_diff.notna() & _price_diff.ne(0)
  774. # _seats_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining'].diff()
  775. # df_input['seats_remaining_change_amount'] = _seats_diff.where(_price_changed).round(0)
  776. # # 前向填充 并 填充缺失值为0
  777. # df_input['seats_remaining_change_amount'] = (
  778. # df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining_change_amount']
  779. # .ffill()
  780. # .fillna(0)
  781. # )
  782. adult_price = df_input.pop('Adult_Total_Price')
  783. hours_until = df_input.pop('Hours_Until_Departure')
  784. df_input['Adult_Total_Price'] = adult_price
  785. df_input['Hours_Until_Departure'] = hours_until
  786. df_input['Baggage'] = df_input['baggage']
  787. # 训练过程
  788. if is_train:
  789. df_target = df_input[(df_input['hours_until_departure'] >= 12) & (df_input['hours_until_departure'] <= 360)].copy() # 扩展至360小时(15天)
  790. df_target = df_target.sort_values(
  791. by=['gid', 'hours_until_departure'],
  792. ascending=[True, False]
  793. ).reset_index(drop=True)
  794. # 对于先升后降的分析
  795. prev_pct = df_target.groupby('gid', group_keys=False)['price_change_percent'].shift(1)
  796. prev_amo = df_target.groupby('gid', group_keys=False)['price_change_amount'].shift(1)
  797. prev_dur = df_target.groupby('gid', group_keys=False)['price_duration_hours'].shift(1)
  798. # prev_seats_amo = df_target.groupby('gid', group_keys=False)['seats_remaining_change_amount'].shift(1)
  799. prev_price = df_target.groupby('gid', group_keys=False)['adult_total_price'].shift(1)
  800. prev_seats = df_target.groupby('gid', group_keys=False)['seats_remaining'].shift(1)
  801. drop_mask = (prev_pct > 0) & (df_target['price_change_percent'] < 0)
  802. df_drop_nodes = df_target.loc[drop_mask, ['gid', 'hours_until_departure']].copy()
  803. df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
  804. df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  805. df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  806. df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
  807. df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
  808. df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
  809. # df_drop_nodes['high_price_seats_remaining_change_amount'] = prev_seats_amo.loc[drop_mask].astype(float).round(1).to_numpy()
  810. df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy()
  811. df_drop_nodes['high_price_seats_remaining'] = prev_seats.loc[drop_mask].astype(int).to_numpy()
  812. df_drop_nodes = df_drop_nodes.reset_index(drop=True)
  813. flight_info_cols = [
  814. 'city_pair',
  815. 'flight_number_1', 'seg1_dep_air_port', 'seg1_dep_time', 'seg1_arr_air_port', 'seg1_arr_time',
  816. 'flight_number_2', 'seg2_dep_air_port', 'seg2_dep_time', 'seg2_arr_air_port', 'seg2_arr_time',
  817. 'currency', 'baggage', 'flight_day',
  818. ]
  819. flight_info_cols = [c for c in flight_info_cols if c in df_target.columns]
  820. df_gid_info = df_target[['gid'] + flight_info_cols].drop_duplicates(subset=['gid']).reset_index(drop=True)
  821. df_drop_nodes = df_drop_nodes.merge(df_gid_info, on='gid', how='left')
  822. drop_info_cols = ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  823. 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount',
  824. 'high_price_amount', 'high_price_seats_remaining',
  825. ]
  826. # 按顺序排列 去掉gid
  827. df_drop_nodes = df_drop_nodes[flight_info_cols + drop_info_cols]
  828. # df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_price_change_percent'] <= -0.01] # 太低的降幅不计
  829. # 对于“上涨后再次上涨”的分析(连续两个正向变价段)
  830. seg_start_mask = df_target['price_duration_hours'].eq(1)
  831. rise_mask = seg_start_mask & (prev_pct > 0) & (df_target['price_change_percent'] > 0)
  832. df_rise_nodes = df_target.loc[rise_mask, ['gid', 'hours_until_departure']].copy()
  833. df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True)
  834. df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
  835. df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
  836. df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy()
  837. df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy()
  838. df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy()
  839. df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy()
  840. df_rise_nodes['prev_rise_seats_remaining'] = prev_seats.loc[rise_mask].astype(int).to_numpy()
  841. df_rise_nodes = df_rise_nodes.reset_index(drop=True)
  842. df_rise_nodes = df_rise_nodes.merge(df_gid_info, on='gid', how='left')
  843. rise_info_cols = [
  844. 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
  845. 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount',
  846. 'prev_rise_amount', 'prev_rise_seats_remaining',
  847. ]
  848. df_rise_nodes = df_rise_nodes[flight_info_cols + rise_info_cols]
  849. # 制作历史包络线
  850. envelope_group = ['city_pair', 'flight_number_1', 'flight_number_2', 'flight_day']
  851. idx_peak = df_input.groupby(envelope_group)['adult_total_price'].idxmax()
  852. df_envelope = df_input.loc[idx_peak, envelope_group + [
  853. 'adult_total_price', 'hours_until_departure'
  854. ]].rename(columns={
  855. 'adult_total_price': 'peak_price',
  856. 'hours_until_departure': 'peak_hours',
  857. }).reset_index(drop=True)
  858. # 对于没有先升后降的gid进行分析
  859. # gids_with_drop = df_target.loc[drop_mask, 'gid'].unique()
  860. # df_no_drop = df_target[~df_target['gid'].isin(gids_with_drop)].copy()
  861. # keep_info_cols = [
  862. # 'keep_hours_until_departure', 'keep_price_change_percent', 'keep_price_change_amount',
  863. # 'keep_price_duration_hours', 'keep_price_amount', 'keep_price_seats_remaining',
  864. # ]
  865. # if df_no_drop.empty:
  866. # df_keep_nodes = pd.DataFrame(columns=flight_info_cols + keep_info_cols)
  867. # else:
  868. # df_no_drop = df_no_drop.sort_values(
  869. # by=['gid', 'hours_until_departure'],
  870. # ascending=[True, False]
  871. # ).reset_index(drop=True)
  872. # df_no_drop['keep_segment'] = df_no_drop.groupby('gid')['price_change_percent'].transform(
  873. # lambda s: (s != s.shift()).cumsum()
  874. # )
  875. # df_keep_row = (
  876. # df_no_drop.groupby(['gid', 'keep_segment'], as_index=False)
  877. # .tail(1)
  878. # .reset_index(drop=True)
  879. # )
  880. # df_keep_nodes = df_keep_row[
  881. # ['gid', 'hours_until_departure', 'price_change_percent', 'price_change_amount',
  882. # 'price_duration_hours', 'adult_total_price', 'seats_remaining']
  883. # ].copy()
  884. # df_keep_nodes.rename(
  885. # columns={
  886. # 'hours_until_departure': 'keep_hours_until_departure',
  887. # 'price_change_percent': 'keep_price_change_percent',
  888. # 'price_change_amount': 'keep_price_change_amount',
  889. # 'price_duration_hours': 'keep_price_duration_hours',
  890. # 'adult_total_price': 'keep_price_amount',
  891. # 'seats_remaining': 'keep_price_seats_remaining',
  892. # },
  893. # inplace=True,
  894. # )
  895. # df_keep_nodes = df_keep_nodes.merge(df_gid_info, on='gid', how='left')
  896. # df_keep_nodes = df_keep_nodes[flight_info_cols + keep_info_cols]
  897. # del df_keep_row
  898. del df_gid_info
  899. del df_target
  900. # del df_no_drop
  901. return df_input, df_drop_nodes, df_rise_nodes, df_envelope
  902. return df_input, None, None, None
  903. def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".", pred_time_str=""):
  904. if df_input is None or df_input.empty:
  905. return pd.DataFrame()
  906. df_sorted = df_input.sort_values(
  907. by=['gid', 'hours_until_departure'],
  908. ascending=[True, False],
  909. ).reset_index(drop=True)
  910. df_sorted = df_sorted[
  911. df_sorted['hours_until_departure'].between(12, 360)
  912. ].reset_index(drop=True)
  913. # 每个 gid 取 hours_until_departure 最小的一条
  914. df_min_hours = (
  915. df_sorted.drop_duplicates(subset=['gid'], keep='last')
  916. .reset_index(drop=True)
  917. )
  918. # 确保 hours_until_departure 在 [12, 360] 的 范围内
  919. # df_min_hours = df_min_hours[
  920. # df_min_hours['hours_until_departure'].between(12, 360)
  921. # ].reset_index(drop=True)
  922. drop_info_csv_path = os.path.join(output_dir, f'{group_route_str}_drop_info.csv')
  923. if os.path.exists(drop_info_csv_path):
  924. df_drop_nodes = pd.read_csv(drop_info_csv_path)
  925. else:
  926. df_drop_nodes = pd.DataFrame()
  927. rise_info_csv_path = os.path.join(output_dir, f'{group_route_str}_rise_info.csv')
  928. if os.path.exists(rise_info_csv_path):
  929. df_rise_nodes = pd.read_csv(rise_info_csv_path)
  930. else:
  931. df_rise_nodes = pd.DataFrame()
  932. # ==================== 跨航班日包络线 + 降价潜力 ====================
  933. print(">>> 构建跨航班日价格包络线")
  934. flight_key = ['city_pair', 'flight_number_1', 'flight_number_2']
  935. day_key = flight_key + ['flight_day']
  936. # 1. 历史侧:加载训练阶段的峰值数据
  937. envelope_csv_path = os.path.join(output_dir, f'{group_route_str}_envelope_info.csv')
  938. if os.path.exists(envelope_csv_path):
  939. df_hist = pd.read_csv(envelope_csv_path)
  940. df_hist = df_hist[day_key + ['peak_price', 'peak_hours']]
  941. df_hist['source'] = 'hist'
  942. else:
  943. df_hist = pd.DataFrame()
  944. # 2. 未来侧:当前在售价格
  945. df_future = df_min_hours[day_key + ['adult_total_price', 'hours_until_departure']].copy().rename(
  946. columns={'adult_total_price': 'peak_price', 'hours_until_departure': 'peak_hours'}
  947. )
  948. df_future['source'] = 'future'
  949. # 3. 合并包络线数据点
  950. df_envelope_all = pd.concat(
  951. [x for x in [df_hist, df_future] if not x.empty], ignore_index=True
  952. ).drop_duplicates(subset=day_key, keep='last')
  953. # 4. 包络线统计 + 找高点起飞日
  954. df_envelope_agg = df_envelope_all.groupby(flight_key).agg(
  955. envelope_max=('peak_price', 'max'), # 峰值最大
  956. envelope_min=('peak_price', 'min'), # 峰值最小
  957. envelope_mean=('peak_price', 'mean'), # 峰值平均
  958. envelope_count=('peak_price', 'count'), # 峰值统计总数
  959. envelope_avg_peak_hours=('peak_hours', 'mean'), # 峰值发生的距离起飞小时数, 做一下平均
  960. ).reset_index()
  961. # 对数值列保留两位小数
  962. df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']] = df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']].round(2)
  963. idx_top = df_envelope_all.groupby(flight_key)['peak_price'].idxmax()
  964. df_top = df_envelope_all.loc[idx_top, flight_key + ['flight_day', 'peak_price', 'peak_hours']].rename(
  965. columns={'flight_day': 'target_flight_day', 'peak_price': 'target_price', 'peak_hours': 'target_peak_hours'}
  966. )
  967. df_envelope_agg = df_envelope_agg.merge(df_top, on=flight_key, how='left')
  968. # 5. 合并到 df_min_hours
  969. df_min_hours = df_min_hours.merge(df_envelope_agg, on=flight_key, how='left')
  970. price_range = (df_min_hours['envelope_max'] - df_min_hours['envelope_min']).replace(0, 1) # 计算当前价格在包络区间的百分位
  971. df_min_hours['envelope_position'] = (
  972. (df_min_hours['adult_total_price'] - df_min_hours['envelope_min']) / price_range
  973. ).clip(0, 1).round(4)
  974. df_min_hours['is_envelope_peak'] = (df_min_hours['envelope_position'] >= 0.75).astype(int) # 0.95 -> 0.75
  975. df_min_hours['is_target_day'] = (df_min_hours['flight_day'] == df_min_hours['target_flight_day']).astype(int)
  976. # ==================== 目标二:降价潜力评分 ====================
  977. # 用“上涨后回落倾向”替代简单计数:drop / (drop + rise)
  978. # drop_count 来自 _drop_info.csv(上涨段后转跌),rise_count 来自 _rise_info.csv(上涨段后继续涨)
  979. df_min_hours['drop_potential'] = 0.0
  980. # 先保证相关列一定存在,避免后续选列 KeyError
  981. # df_min_hours['drop_freq_count'] = 0.0
  982. # df_min_hours['rise_freq_count'] = 0.0
  983. df_drop_freq = pd.DataFrame(columns=flight_key + ['drop_freq_count'])
  984. df_rise_freq = pd.DataFrame(columns=flight_key + ['rise_freq_count'])
  985. if not df_drop_nodes.empty:
  986. df_drop_freq = (
  987. df_drop_nodes.groupby(flight_key)
  988. .size()
  989. .reset_index(name='drop_freq_count')
  990. )
  991. if not df_rise_nodes.empty:
  992. df_rise_freq = (
  993. df_rise_nodes.groupby(flight_key)
  994. .size()
  995. .reset_index(name='rise_freq_count')
  996. )
  997. if (not df_drop_freq.empty) or (not df_rise_freq.empty):
  998. df_min_hours = df_min_hours.merge(df_drop_freq, on=flight_key, how='left')
  999. df_min_hours = df_min_hours.merge(df_rise_freq, on=flight_key, how='left')
  1000. df_min_hours['drop_freq_count'] = df_min_hours['drop_freq_count'].fillna(0).astype(float)
  1001. df_min_hours['rise_freq_count'] = df_min_hours['rise_freq_count'].fillna(0).astype(float)
  1002. # 轻微平滑,避免样本很少时出现 0/0 或过度极端
  1003. alpha = 1.0
  1004. denom = df_min_hours['drop_freq_count'] + df_min_hours['rise_freq_count'] + 2.0 * alpha
  1005. df_min_hours['drop_potential'] = (
  1006. (df_min_hours['drop_freq_count'] + alpha) / denom.replace(0, np.nan)
  1007. ).fillna(0.0).clip(0, 1).round(4)
  1008. # ==================== 综合评分:包络高位 × 降价潜力 ====================
  1009. # target_score = 包络位置(越高越好)× 降价潜力(越高越好)
  1010. thres_ep = 0.7
  1011. thres_dp = 0.3
  1012. df_min_hours['target_score'] = (
  1013. df_min_hours['envelope_position'] * thres_ep + df_min_hours['drop_potential'] * thres_dp
  1014. ).round(4)
  1015. # 综合评分阈值:大于阈值的都认为值得投放
  1016. target_score_threshold = 0.75
  1017. # df_min_hours['target_score_threshold'] = target_score_threshold
  1018. df_min_hours['is_good_target'] = (df_min_hours['target_score'] >= target_score_threshold).astype(int)
  1019. print(f">>> 包络线+降价潜力评分完成")
  1020. del df_hist, df_future, df_envelope_all, df_envelope_agg, df_top, df_drop_freq, df_rise_freq
  1021. df_min_hours = df_min_hours[df_min_hours['is_good_target'] == 1].reset_index(drop=True) # 保留值得投放的
  1022. # =====================================================================
  1023. df_min_hours['simple_will_price_drop'] = 0
  1024. df_min_hours['simple_drop_in_hours'] = 0
  1025. df_min_hours['simple_drop_in_hours_prob'] = 0.0
  1026. df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知
  1027. df_min_hours['flag_dist'] = ''
  1028. df_min_hours['drop_price_change_upper'] = 0.0
  1029. # df_min_hours['drop_price_change_mode'] = 0.0
  1030. df_min_hours['drop_price_change_lower'] = 0.0
  1031. df_min_hours['drop_price_sample_size'] = 0
  1032. df_min_hours['rise_price_change_upper'] = 0.0
  1033. # df_min_hours['rise_price_change_mode'] = 0.0
  1034. df_min_hours['rise_price_change_lower'] = 0.0
  1035. df_min_hours['rise_price_sample_size'] = 0
  1036. # 这个阈值取多少?
  1037. pct_threshold = 0.01
  1038. # pct_threshold = 2
  1039. pct_threshold_1 = 0.01
  1040. # pct_threshold_c = 0.001
  1041. for idx, row in df_min_hours.iterrows():
  1042. city_pair = row['city_pair']
  1043. flight_number_1 = row['flight_number_1']
  1044. flight_number_2 = row['flight_number_2']
  1045. if flight_number_1 == 'VJ878': # 调试时用
  1046. pass
  1047. price_change_percent = row['price_change_percent']
  1048. price_change_amount = row['price_change_amount']
  1049. price_duration_hours = row['price_duration_hours']
  1050. hours_until_departure = row['hours_until_departure']
  1051. # seats_remaining_change_amount = row['seats_remaining_change_amount']
  1052. price_amount = row['adult_total_price']
  1053. seats_remaining = row['seats_remaining']
  1054. length_drop = 0
  1055. length_rise = 0
  1056. # length_keep = 0
  1057. # 针对历史上发生的 高价->低价
  1058. if not df_drop_nodes.empty:
  1059. # 对准航班号, 不同起飞日期
  1060. if flight_number_2 and flight_number_2 != 'VJ':
  1061. df_drop_nodes_part = df_drop_nodes[
  1062. (df_drop_nodes['city_pair'] == city_pair) &
  1063. (df_drop_nodes['flight_number_1'] == flight_number_1) &
  1064. (df_drop_nodes['flight_number_2'] == flight_number_2)
  1065. ]
  1066. else:
  1067. df_drop_nodes_part = df_drop_nodes[
  1068. (df_drop_nodes['city_pair'] == city_pair) &
  1069. (df_drop_nodes['flight_number_1'] == flight_number_1)
  1070. ]
  1071. # 降价前 增幅阈值的匹配 与 高价历史持续时间 得出降价时间的概率
  1072. if not df_drop_nodes_part.empty and pd.notna(price_change_percent):
  1073. # 增幅太小的去掉
  1074. # df_drop_nodes_part = df_drop_nodes_part[df_drop_nodes_part['high_price_change_percent'] >= 0.01]
  1075. # pct_diff = (df_drop_nodes_part['high_price_change_percent'] - float(price_change_percent)).abs()
  1076. # df_match = df_drop_nodes_part.loc[pct_diff <= pct_threshold, ['high_price_duration_hours', 'high_price_change_percent']].copy()
  1077. pct_base = float(price_change_percent)
  1078. pct_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_percent'], errors='coerce')
  1079. df_drop_gap = df_drop_nodes_part.loc[
  1080. pct_vals.notna(),
  1081. ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
  1082. 'high_price_duration_hours', 'high_price_change_percent',
  1083. 'high_price_change_amount', 'high_price_amount', 'high_price_seats_remaining']
  1084. ].copy()
  1085. df_drop_gap['pct_gap'] = (pct_vals.loc[pct_vals.notna()] - pct_base)
  1086. df_drop_gap['pct_abs_gap'] = df_drop_gap['pct_gap'].abs()
  1087. price_base = pd.to_numeric(price_amount, errors='coerce')
  1088. high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce')
  1089. df_drop_gap['price_gap'] = high_price_vals - price_base
  1090. df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
  1091. df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
  1092. df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy()
  1093. # 历史上出现的极近似的增长幅度后的降价场景
  1094. if not df_match.empty:
  1095. dur_base = pd.to_numeric(price_duration_hours, errors='coerce')
  1096. hud_base = pd.to_numeric(hours_until_departure, errors='coerce')
  1097. # seats_base = pd.to_numeric(seats_remaining_change_amount, errors='coerce')
  1098. if pd.notna(dur_base) and pd.notna(hud_base): # and pd.notna(seats_base)
  1099. df_match_chk = df_match.copy()
  1100. # dur_vals = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce')
  1101. # df_match_chk = df_match_chk.loc[dur_vals.notna()].copy()
  1102. # df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 36].copy()
  1103. # drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce')
  1104. # df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy()
  1105. # df_match_chk = df_match_chk.loc[(drop_hud_vals.loc[drop_hud_vals.notna()] - float(hud_base)).abs() <= 24].copy()
  1106. # seats_vals = pd.to_numeric(df_match_chk['high_price_seats_remaining_change_amount'], errors='coerce')
  1107. # df_match_chk = df_match_chk.loc[seats_vals.notna()].copy()
  1108. # df_match_chk = df_match_chk.loc[seats_vals.loc[seats_vals.notna()] == float(seats_base)].copy()
  1109. # 持续时间、距离起飞时间、座位变化都匹配上
  1110. if not df_match_chk.empty:
  1111. length_drop = df_match_chk.shape[0]
  1112. df_min_hours.loc[idx, 'drop_price_sample_size'] = length_drop
  1113. drop_price_change_upper = df_match_chk['drop_price_change_amount'].max() # 降价上限
  1114. drop_price_change_lower = df_match_chk['drop_price_change_amount'].min() # 降价下限
  1115. df_min_hours.loc[idx, 'drop_price_change_upper'] = round(drop_price_change_upper, 2)
  1116. df_min_hours.loc[idx, 'drop_price_change_lower'] = round(drop_price_change_lower, 2)
  1117. # drop_mode_values = df_match_chk['drop_price_change_amount'].mode() # 降价众数
  1118. # if len(drop_mode_values) > 0:
  1119. # df_min_hours.loc[idx, 'drop_price_change_mode'] = round(float(drop_mode_values[0]), 2)
  1120. remaining_hours = (
  1121. pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base)
  1122. ).clip(lower=0)
  1123. remaining_hours = remaining_hours.round().astype(int)
  1124. counts = remaining_hours.value_counts().sort_index()
  1125. probs = (counts / counts.sum()).round(4)
  1126. top_hours = int(probs.idxmax())
  1127. top_prob = float(probs.max())
  1128. dist_items = list(zip(probs.index.tolist(), probs.tolist()))
  1129. dist_items = dist_items[:10]
  1130. dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items])
  1131. df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1132. df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours
  1133. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1
  1134. df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str
  1135. df_min_hours.loc[idx, 'flag_dist'] = 'd0'
  1136. # continue # 已经判定降价 后面不再做
  1137. # 历史上未出现的极近似的增长幅度后的降价场景
  1138. else:
  1139. pass
  1140. # if pd.notna(price_duration_hours) and price_change_percent >= 0.1:
  1141. # pct_vals = pd.to_numeric(
  1142. # df_drop_nodes_part['high_price_change_percent'],
  1143. # errors='coerce'
  1144. # ).replace([np.inf, -np.inf], np.nan).dropna()
  1145. # dur_vals = pd.to_numeric(
  1146. # df_drop_nodes_part['high_price_duration_hours'],
  1147. # errors='coerce'
  1148. # ).replace([np.inf, -np.inf], np.nan).dropna()
  1149. # if not pct_vals.empty and not dur_vals.empty:
  1150. # pct_min = float(pct_vals.min())
  1151. # pct_max = float(pct_vals.max())
  1152. # dur_min = float(dur_vals.min())
  1153. # dur_max = float(dur_vals.max())
  1154. # if (pct_min <= float(price_change_percent) <= pct_max) and (dur_min <= float(price_duration_hours) <= dur_max):
  1155. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1156. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1157. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
  1158. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.5'
  1159. # continue # 已经判定降价 后面不再做
  1160. # elif (pct_min <= float(price_change_percent)) and (dur_min <= float(price_duration_hours)):
  1161. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1162. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1163. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.3
  1164. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.3'
  1165. # continue # 已经判定降价 后面不再做
  1166. # 针对历史上发生的 连续涨价
  1167. if not df_rise_nodes.empty:
  1168. # 对准航班号, 不同起飞日期
  1169. if flight_number_2 and flight_number_2 != 'VJ':
  1170. df_rise_nodes_part = df_rise_nodes[
  1171. (df_rise_nodes['city_pair'] == city_pair) &
  1172. (df_rise_nodes['flight_number_1'] == flight_number_1) &
  1173. (df_rise_nodes['flight_number_2'] == flight_number_2)
  1174. ]
  1175. else:
  1176. df_rise_nodes_part = df_rise_nodes[
  1177. (df_rise_nodes['city_pair'] == city_pair) &
  1178. (df_rise_nodes['flight_number_1'] == flight_number_1)
  1179. ]
  1180. if not df_rise_nodes_part.empty and pd.notna(price_change_percent):
  1181. # pct_vals_1 = df_keep_nodes_part['keep_price_change_percent'].replace([np.inf, -np.inf], np.nan).dropna()
  1182. # # 保留百分位 10% ~ 90% 之间的 数据
  1183. # if not pct_vals_1.empty:
  1184. # q10_1 = float(pct_vals_1.quantile(0.10))
  1185. # q90_1 = float(pct_vals_1.quantile(0.90))
  1186. # df_keep_nodes_part = df_keep_nodes_part[
  1187. # df_keep_nodes_part['keep_price_change_percent'].between(q10_1, q90_1)
  1188. # ]
  1189. # if df_keep_nodes_part.empty:
  1190. # continue
  1191. # 特殊判定场景
  1192. # if price_change_percent < 0:
  1193. # df_tmp = df_keep_nodes_part.copy()
  1194. # # 确保组内顺序正确(如果前面已经排过,这行可省略)
  1195. # df_tmp = df_tmp.sort_values(
  1196. # by=["flight_day", "keep_hours_until_departure"],
  1197. # ascending=[True, False]
  1198. # )
  1199. # # 是否为负值
  1200. # df_tmp["is_negative"] = df_tmp["keep_price_change_percent"] < 0
  1201. # if df_tmp["is_negative"].any():
  1202. # # 标记“负值段”的开始
  1203. # # 当 is_negative 为 True 且 前一行不是负值时,认为是一个新段
  1204. # df_tmp["neg_block_id"] = (
  1205. # df_tmp["is_negative"]
  1206. # & ~df_tmp.groupby("flight_day")["is_negative"].shift(fill_value=False)
  1207. # ).groupby(df_tmp["flight_day"]).cumsum()
  1208. # # 在每个负值段内计数(第几个负值)
  1209. # df_tmp["neg_rank_in_block"] = (
  1210. # df_tmp.groupby(["flight_day", "neg_block_id"])
  1211. # .cumcount() + 1
  1212. # )
  1213. # # 每个连续负值段的长度
  1214. # df_tmp["neg_block_size"] = (
  1215. # df_tmp.groupby(["flight_day", "neg_block_id"])["is_negative"]
  1216. # .transform("sum")
  1217. # )
  1218. # # 只保留:
  1219. # # 1) 是负值
  1220. # # 2) 且不是该连续负值段的最后一个
  1221. # df_continuous_price_drop = df_tmp[
  1222. # (df_tmp["is_negative"]) &
  1223. # (df_tmp["neg_rank_in_block"] < df_tmp["neg_block_size"])
  1224. # ].drop(
  1225. # columns=[
  1226. # "is_negative",
  1227. # "neg_block_id",
  1228. # "neg_rank_in_block",
  1229. # "neg_block_size",
  1230. # ]
  1231. # )
  1232. # pct_diff_c = (df_continuous_price_drop['keep_price_change_percent'] - float(price_change_percent)).abs()
  1233. # df_match_c = df_continuous_price_drop.loc[pct_diff_c <= pct_threshold_c, ['flight_day', 'keep_hours_until_departure', 'keep_price_duration_hours', 'keep_price_change_percent']].copy()
  1234. # # 符合连续降价条件
  1235. # if not df_match_c.empty and pd.notna(price_duration_hours):
  1236. # vals_c = df_match_c['keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
  1237. # if not vals_c.empty:
  1238. # min_val_c = vals_c.min()
  1239. # if min_val_c <= float(price_duration_hours):
  1240. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1241. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1242. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
  1243. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'c1'
  1244. # length_drop = df_match_c.shape[0]
  1245. # # continue # 已经判定降价 后面不再做
  1246. # 一般判定场景
  1247. pct_base_1 = float(price_change_percent)
  1248. pct_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_percent'], errors='coerce')
  1249. df_rise_gap_1 = df_rise_nodes_part.loc[
  1250. pct_vals_1.notna(),
  1251. ['rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
  1252. 'prev_rise_duration_hours', 'prev_rise_change_percent',
  1253. 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_seats_remaining']
  1254. ].copy()
  1255. df_rise_gap_1['pct_gap'] = (pct_vals_1.loc[pct_vals_1.notna()] - pct_base_1)
  1256. df_rise_gap_1['pct_abs_gap'] = df_rise_gap_1['pct_gap'].abs()
  1257. price_base_1 = pd.to_numeric(price_amount, errors='coerce')
  1258. rise_price_vals_1 = pd.to_numeric(df_rise_gap_1['prev_rise_amount'], errors='coerce')
  1259. df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1
  1260. df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs()
  1261. df_rise_gap_1 = df_rise_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
  1262. df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_rise_gap_1['price_abs_gap'] <= 10.0)].copy()
  1263. # 历史上出现过近似变化幅度后继续涨价场景
  1264. if not df_match_1.empty:
  1265. # df_match_1['hours_delta'] = hours_until_departure - df_match_1['rise_hours_until_departure']
  1266. # df_match_1['modify_rise_price_duration_hours'] = df_match_1['rise_price_duration_hours'] - df_match_1['hours_delta']
  1267. # df_match_1 = df_match_1[df_match_1['modify_rise_price_duration_hours'] > 0]
  1268. # dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce')
  1269. hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce')
  1270. # seats_base_1 = pd.to_numeric(seats_remaining_change_amount, errors='coerce')
  1271. if pd.notna(hud_base_1): # and pd.notna(seats_base_1)
  1272. df_match_chk_1 = df_match_1.copy()
  1273. # dur_vals_1 = pd.to_numeric(df_match_chk_1['modify_rise_price_duration_hours'], errors='coerce')
  1274. # df_match_chk_1 = df_match_chk_1.loc[dur_vals_1.notna()].copy()
  1275. # df_match_chk_1 = df_match_chk_1.loc[(dur_vals_1.loc[dur_vals_1.notna()] - float(dur_base_1)).abs() <= 24].copy()
  1276. # rise_hud_vals_1 = pd.to_numeric(df_match_chk_1['rise_hours_until_departure'], errors='coerce')
  1277. # df_match_chk_1 = df_match_chk_1.loc[rise_hud_vals_1.notna()].copy()
  1278. # df_match_chk_1 = df_match_chk_1.loc[(rise_hud_vals_1.loc[rise_hud_vals_1.notna()] - float(hud_base_1)).abs() <= 24].copy()
  1279. # seats_vals_1 = pd.to_numeric(df_match_chk_1['rise_seats_remaining_change_amount'], errors='coerce')
  1280. # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.notna()].copy()
  1281. # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.loc[seats_vals_1.notna()] == float(seats_base_1)].copy()
  1282. # 持续时间、距离起飞时间、座位变化都匹配上
  1283. if not df_match_chk_1.empty:
  1284. length_rise = df_match_chk_1.shape[0]
  1285. df_min_hours.loc[idx, 'rise_price_sample_size'] = length_rise
  1286. rise_price_change_upper = df_match_chk_1['rise_price_change_amount'].max() # 涨价上限
  1287. rise_price_change_lower = df_match_chk_1['rise_price_change_amount'].min() # 涨价下限
  1288. df_min_hours.loc[idx, 'rise_price_change_upper'] = round(rise_price_change_upper, 2)
  1289. df_min_hours.loc[idx, 'rise_price_change_lower'] = round(rise_price_change_lower, 2)
  1290. # rise_mode_values = df_match_chk_1['rise_price_change_amount'].mode() # 涨价众数
  1291. # if len(rise_mode_values) > 0:
  1292. # df_min_hours.loc[idx, 'rise_price_change_mode'] = round(float(rise_mode_values[0]), 2)
  1293. # 可以明确的判定不降价
  1294. if length_drop == 0:
  1295. df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  1296. df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1297. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
  1298. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r0'
  1299. df_min_hours.loc[idx, 'flag_dist'] = 'r0'
  1300. # 分歧判定
  1301. else:
  1302. drop_prob = round(length_drop / (length_rise + length_drop), 2)
  1303. # 依旧保持之前的降价判定,概率修改
  1304. if drop_prob >= 0.4:
  1305. df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1306. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1'
  1307. df_min_hours.loc[idx, 'flag_dist'] = 'd1'
  1308. # 改判不降价,概率修改
  1309. else:
  1310. df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  1311. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r1'
  1312. df_min_hours.loc[idx, 'flag_dist'] = 'r1'
  1313. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1314. df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob
  1315. # elif length_keep == length_drop: # 不降价与降价相同, 取0.5概率
  1316. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
  1317. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1318. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
  1319. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
  1320. # df_match_1['hours_delta'] = hours_until_departure - df_match_1['keep_hours_until_departure']
  1321. # df_match_1['modify_keep_price_duration_hours'] = df_match_1['keep_price_duration_hours'] - df_match_1['hours_delta']
  1322. # df_match_1 = df_match_1[df_match_1['modify_keep_price_duration_hours'] > 0]
  1323. # 比较 price_duration_hours 在 modify_keep_price_duration_hours 的百分位
  1324. # vals = df_match_1['modify_keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
  1325. # if not vals.empty:
  1326. # # q10_11 = float(vals.quantile(0.10))
  1327. # min_val = vals.min()
  1328. # if min_val <= float(price_duration_hours):
  1329. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  1330. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1331. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
  1332. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
  1333. # 历史上未出现过近似变化幅度后保持低价场景
  1334. else:
  1335. pass
  1336. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  1337. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1338. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
  1339. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n0'
  1340. # if pd.notna(price_duration_hours) and price_change_percent <= 0.1:
  1341. # df_keep_nodes_part_1 = df_keep_nodes_part[df_keep_nodes_part['keep_price_change_percent'] <= 0.1]
  1342. # pct_vals_1 = pd.to_numeric(
  1343. # df_keep_nodes_part_1['keep_price_change_percent'],
  1344. # errors='coerce'
  1345. # ).replace([np.inf, -np.inf], np.nan).dropna()
  1346. # dur_vals_1 = pd.to_numeric(
  1347. # df_keep_nodes_part_1['keep_price_duration_hours'],
  1348. # errors='coerce'
  1349. # ).replace([np.inf, -np.inf], np.nan).dropna()
  1350. # if not pct_vals_1.empty and not dur_vals_1.empty:
  1351. # pct_min_1 = float(pct_vals_1.min())
  1352. # pct_max_1 = float(pct_vals_1.max())
  1353. # dur_min_1 = float(dur_vals_1.min())
  1354. # dur_max_1 = float(dur_vals_1.max())
  1355. # if (pct_min_1 <= float(price_change_percent) <= pct_max_1) and (dur_min_1 <= float(price_duration_hours) <= dur_max_1):
  1356. # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
  1357. # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
  1358. # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
  1359. # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n1'
  1360. pass
  1361. print("判定循环结束")
  1362. # 按航班号统一其降价/涨价的上限与下限, 上限统一取最大, 下限统一取最小
  1363. # _grp_cols = ['city_pair', 'flight_number_1', 'flight_number_2']
  1364. # _g = df_min_hours.groupby(_grp_cols, dropna=False)
  1365. # df_min_hours['drop_price_change_upper'] = pd.to_numeric(
  1366. # _g['drop_price_change_upper'].transform('max'),
  1367. # errors='coerce'
  1368. # ).fillna(0.0).round(2)
  1369. # df_min_hours['drop_price_change_lower'] = pd.to_numeric(
  1370. # _g['drop_price_change_lower'].transform('min'),
  1371. # errors='coerce'
  1372. # ).fillna(0.0).round(2)
  1373. # df_min_hours['rise_price_change_upper'] = pd.to_numeric(
  1374. # _g['rise_price_change_upper'].transform('max'),
  1375. # errors='coerce'
  1376. # ).fillna(0.0).round(2)
  1377. # df_min_hours['rise_price_change_lower'] = pd.to_numeric(
  1378. # _g['rise_price_change_lower'].transform('min'),
  1379. # errors='coerce'
  1380. # ).fillna(0.0).round(2)
  1381. df_min_hours = df_min_hours.rename(columns={'seg1_dep_time': 'from_time'})
  1382. _pred_dt = pd.to_datetime(str(pred_time_str), format="%Y%m%d%H%M", errors="coerce")
  1383. df_min_hours["update_hour"] = _pred_dt.strftime("%Y-%m-%d %H:%M:%S")
  1384. _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h")
  1385. df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(360, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
  1386. df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(12, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
  1387. # 要展示在预测表里的字段
  1388. order_cols = ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2', 'from_time',
  1389. 'baggage', 'seats_remaining', 'currency',
  1390. 'adult_total_price', 'hours_until_departure', 'price_change_percent', 'price_duration_hours',
  1391. 'update_hour', 'crawl_date',
  1392. 'valid_begin_hour', 'valid_end_hour',
  1393. 'simple_will_price_drop', 'simple_drop_in_hours', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist',
  1394. 'flag_dist',
  1395. 'drop_price_change_upper', 'drop_price_change_lower', 'drop_price_sample_size',
  1396. 'rise_price_change_upper', 'rise_price_change_lower', 'rise_price_sample_size',
  1397. 'envelope_max', 'envelope_min', 'envelope_mean', 'envelope_count',
  1398. 'envelope_avg_peak_hours', 'envelope_position', 'is_envelope_peak', # 包络线特征
  1399. 'target_flight_day', 'target_price', 'target_peak_hours', 'is_target_day', # 高点起飞日(纯包络线高点)
  1400. 'drop_freq_count', 'drop_potential', # 降价潜力
  1401. 'target_score', 'is_good_target', # 综合目标评分(高点 × 降价潜力 = 最终投放目标)
  1402. ]
  1403. df_predict = df_min_hours[order_cols]
  1404. df_predict = df_predict.rename(columns={
  1405. 'simple_will_price_drop': 'will_price_drop',
  1406. 'simple_drop_in_hours': 'drop_in_hours',
  1407. 'simple_drop_in_hours_prob': 'drop_in_hours_prob',
  1408. 'simple_drop_in_hours_dist': 'drop_in_hours_dist',
  1409. }
  1410. )
  1411. # 排序
  1412. df_predict = df_predict.sort_values(
  1413. by=['city_pair', 'flight_number_1', 'flight_number_2', 'flight_day'],
  1414. kind='mergesort',
  1415. na_position='last',
  1416. ).reset_index(drop=True)
  1417. # 时间段过滤 过滤掉异常时间(update_hour 早于 crawl_date)
  1418. update_dt = pd.to_datetime(df_predict["update_hour"], errors="coerce")
  1419. crawl_dt = pd.to_datetime(df_predict["crawl_date"], errors="coerce")
  1420. dt_diff = update_dt - crawl_dt
  1421. df_predict = df_predict.loc[
  1422. (dt_diff >= pd.Timedelta(0)) & (dt_diff <= pd.Timedelta(hours=12))
  1423. # (dt_diff >= pd.Timedelta(0))
  1424. ].reset_index(drop=True)
  1425. print("更新时间过滤完成")
  1426. total_cnt = len(df_predict)
  1427. if "will_price_drop" in df_predict.columns:
  1428. _wpd = pd.to_numeric(df_predict["will_price_drop"], errors="coerce")
  1429. drop_1_cnt = int((_wpd == 1).sum())
  1430. drop_0_cnt = int((_wpd == 0).sum())
  1431. else:
  1432. drop_1_cnt = 0
  1433. drop_0_cnt = 0
  1434. print(f"will_price_drop 分类数量统计: 1(会降)={drop_1_cnt}, 0(不降)={drop_0_cnt}, 总数={total_cnt}")
  1435. csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv')
  1436. df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig')
  1437. print("预测结果已追加")
  1438. return df_predict