| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439 |
- import pandas as pd
- import numpy as np
- import bisect
- import gc
- import os
- from datetime import datetime, timedelta
- from sklearn.preprocessing import StandardScaler
- from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays
- from utils import insert_df_col
- COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
- def preprocess_data_cycle(df_input, interval_hours=8, feature_length=240, target_length=24, is_training=True):
- # df_input_part = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['hours_until_departure'] < current_n_hours)].copy()
- df_input = preprocess_data_first_half(df_input)
- # 创建一个空列表来存储所有处理后的数据部分
- list_df_parts = []
- crop_lower_limit_list = [4] # [4, 28, 52, 76, 100]
- for crop_lower_limit in crop_lower_limit_list:
- target_n_hours = crop_lower_limit + target_length
- feature_n_hours = target_n_hours + interval_hours
- crop_upper_limit = feature_n_hours + feature_length
- df_input_part = preprocess_data(df_input, is_training=is_training, crop_upper_limit=crop_upper_limit, feature_n_hours=feature_n_hours,
- target_n_hours=target_n_hours, crop_lower_limit=crop_lower_limit)
- # 将处理后的部分添加到列表中
- list_df_parts.append(df_input_part)
- if not is_training:
- break
-
- # 合并所有处理后的数据部分
- if list_df_parts:
- df_combined = pd.concat(list_df_parts, ignore_index=True)
- return df_combined
- else:
- return pd.DataFrame() # 如果没有数据,返回空DataFrame
- def preprocess_data_first_half(df_input):
- '''前半部分'''
- print(">>> 开始数据预处理")
- # 生成 城市对
- df_input['city_pair'] = (
- df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
- )
- # 城市码映射成数字
- df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
- df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
-
- missing_from = (
- df_input.loc[df_input['from_city_num'].isna(), 'from_city_code']
- .unique()
- )
- missing_to = (
- df_input.loc[df_input['to_city_num'].isna(), 'to_city_code']
- .unique()
- )
- if missing_from:
- print("未映射的 from_city:", missing_from)
- if missing_to:
- print("未映射的 to_city:", missing_to)
- # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
- cols = df_input.columns.tolist()
- # 删除已存在的几列(保证顺序正确)
- for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
- cols.remove(c)
- # 这几列插入到最前面
- df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
- pass
- # 转格式
- df_input['search_dep_time'] = pd.to_datetime(
- df_input['search_dep_time'],
- format='%Y%m%d',
- errors='coerce'
- ).dt.strftime('%Y-%m-%d')
- # 重命名起飞日期
- df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
-
- # 重命名航班号
- df_input.rename(
- columns={
- 'seg1_flight_number': 'flight_number_1',
- 'seg2_flight_number': 'flight_number_2'
- },
- inplace=True
- )
- # 分开填充
- df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
- df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
- # 航班号转数字
- df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map)
- df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map)
- missing_flight_1 = (
- df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1']
- .unique()
- )
- missing_flight_2 = (
- df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2']
- .unique()
- )
- if missing_flight_1:
- print("未映射的 flight_1:", missing_flight_1)
- if missing_flight_2:
- print("未映射的 flight_2:", missing_flight_2)
-
- # flight_1_num 放在 seg1_dep_air_port 之前
- insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port')
-
- # flight_2_num 放在 seg2_dep_air_port 之前
- insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port')
- df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0
- # baggage_level 放在 flight_number_2 之前
- insert_df_col(df_input, 'baggage_level', 'flight_number_2')
- df_input['Adult_Total_Price'] = df_input['adult_total_price']
- # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值
- insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining')
- df_input['Hours_Until_Departure'] = df_input['hours_until_departure']
- # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值
- insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure')
- pass
- # gid:基于指定字段的分组标记(整数)
- df_input['gid'] = (
- df_input
- .groupby(
- ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
- sort=False
- )
- .ngroup()
- )
- return df_input
- def preprocess_data(df_input, is_training=True, crop_upper_limit=480, feature_n_hours=36, target_n_hours=28, crop_lower_limit=4):
- print(f"裁剪范围: [{crop_lower_limit}, {crop_upper_limit}], 间隔窗口: [{target_n_hours}, {feature_n_hours}]")
- # 做一下时间段裁剪, 保留起飞前480小时之内且大于等于4小时的
- df_input = df_input[(df_input['hours_until_departure'] < crop_upper_limit) &
- (df_input['hours_until_departure'] >= crop_lower_limit)].reset_index(drop=True)
-
- # 在 gid 与 baggage 内按时间降序
- df_input = df_input.sort_values(
- by=['gid', 'baggage', 'hours_until_departure'],
- ascending=[True, True, False]
- ).reset_index(drop=True)
- # 价格幅度阈值
- VALID_DROP_MIN = 5
- # 价格变化掩码
- g = df_input.groupby(['gid', 'baggage'])
- diff = g['adult_total_price'].transform('diff')
- # change_mask = diff.abs() >= VALID_DROP_MIN # 变化太小的不计入
- decrease_mask = diff <= -VALID_DROP_MIN # 降价(变化太小的不计入)
- increase_mask = diff >= VALID_DROP_MIN # 升价(变化太小的不计入)
- df_input['_price_event_dir'] = np.where(increase_mask, 1, np.where(decrease_mask, -1, 0))
- # 计算连续升价/降价次数
- def _calc_price_streaks(df_group):
- dirs = df_group['_price_event_dir'].to_numpy()
- n = len(dirs)
- inc = np.full(n, np.nan)
- dec = np.full(n, np.nan)
- last_dir = 0
- inc_cnt = 0
- dec_cnt = 0
- for i, d in enumerate(dirs):
- if d == 1:
- inc_cnt = inc_cnt + 1 if last_dir == 1 else 1
- dec_cnt = 0
- last_dir = 1
- inc[i] = inc_cnt
- dec[i] = dec_cnt
- elif d == -1:
- dec_cnt = dec_cnt + 1 if last_dir == -1 else 1
- inc_cnt = 0
- last_dir = -1
- inc[i] = inc_cnt
- dec[i] = dec_cnt
-
- inc_s = pd.Series(inc, index=df_group.index).ffill().fillna(0).astype(int)
- dec_s = pd.Series(dec, index=df_group.index).ffill().fillna(0).astype(int)
- return pd.DataFrame(
- {
- 'price_increase_times_consecutive': inc_s,
- 'price_decrease_times_consecutive': dec_s,
- },
- index=df_group.index,
- )
-
- streak_df = df_input.groupby(['gid', 'baggage'], sort=False, group_keys=False).apply(_calc_price_streaks)
- df_input = df_input.join(streak_df)
- df_input.drop(columns=['_price_event_dir'], inplace=True)
-
- # 价格变化次数
- # df_input['price_change_times_total'] = (
- # change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
- # )
- # 价格下降次数
- df_input['price_decrease_times_total'] = (
- decrease_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
- )
- # 价格上升次数
- df_input['price_increase_times_total'] = (
- increase_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
- )
- # 上次发生变价的小时数
- # last_change_hour = (
- # df_input['hours_until_departure']
- # .where(change_mask)
- # .groupby([df_input['gid'], df_input['baggage']])
- # .ffill() # 前向填充
- # )
- # 上次发生降价的小时数
- last_decrease_hour = (
- df_input['hours_until_departure']
- .where(decrease_mask)
- .groupby([df_input['gid'], df_input['baggage']])
- .ffill() # 前向填充
- )
- # 上次发生升价的小时数
- last_increase_hour = (
- df_input['hours_until_departure']
- .where(increase_mask)
- .groupby([df_input['gid'], df_input['baggage']])
- .ffill() # 前向填充
- )
- # 当前距离上一次变价过去多少小时
- # df_input['price_last_change_hours'] = (
- # last_change_hour - df_input['hours_until_departure']
- # ).fillna(0)
- # 当前距离上一次降价过去多少小时
- df_input['price_last_decrease_hours'] = (
- last_decrease_hour - df_input['hours_until_departure']
- ).fillna(0)
- # 当前距离上一次升价过去多少小时
- df_input['price_last_increase_hours'] = (
- last_increase_hour - df_input['hours_until_departure']
- ).fillna(0)
- pass
- # 想插入到 seats_remaining 前面的新列
- new_cols = [
- # 'price_change_times_total',
- # 'price_last_change_hours',
- 'price_decrease_times_total',
- 'price_decrease_times_consecutive',
- 'price_last_decrease_hours',
- 'price_increase_times_total',
- 'price_increase_times_consecutive',
- 'price_last_increase_hours',
- ]
- # 当前所有列
- cols = df_input.columns.tolist()
- # 找到 seats_remaining 的位置
- idx = cols.index('seats_remaining')
- # 重新拼列顺序
- new_order = cols[:idx] + new_cols + cols[idx:]
- # 去重(防止列已经在原位置)
- new_order = list(dict.fromkeys(new_order))
- # 重新排列 DataFrame
- df_input = df_input[new_order]
- pass
- print(">>> 计算价格区间特征")
- # 1. 基于绝对价格水平的价格区间划分
- # 先计算每个(gid, baggage)的价格统计特征
- # g = df_input.groupby(['gid', 'baggage'])
- price_stats = df_input.groupby(['gid', 'baggage'])['adult_total_price'].agg(
- min_price='min',
- max_price='max',
- mean_price='mean',
- std_price='std'
- ).reset_index()
- # 合并统计特征到原数据
- df_input = df_input.merge(price_stats, on=['gid', 'baggage'], how='left')
- # 2. 基于绝对价格的价格区间划分 (可以删除,因为后面有更精细的基于频率加权的分类)
- # # 高价区间:超过均值+1倍标准差
- # df_input['price_absolute_high'] = (df_input['adult_total_price'] >
- # (df_input['mean_price'] + df_input['std_price'])).astype(int)
- # # 中高价区间:均值到均值+1倍标准差
- # df_input['price_absolute_mid_high'] = ((df_input['adult_total_price'] > df_input['mean_price']) &
- # (df_input['adult_total_price'] <= (df_input['mean_price'] + df_input['std_price']))).astype(int)
- # # 中低价区间:均值-1倍标准差到均值
- # df_input['price_absolute_mid_low'] = ((df_input['adult_total_price'] > (df_input['mean_price'] - df_input['std_price'])) &
- # (df_input['adult_total_price'] <= df_input['mean_price'])).astype(int)
- # # 低价区间:低于均值-1倍标准差
- # df_input['price_absolute_low'] = (df_input['adult_total_price'] <= (df_input['mean_price'] - df_input['std_price'])).astype(int)
- # 3. 基于频率加权的价格百分位数(改进版)
- # 计算每个价格出现的频率
- price_freq = df_input.groupby(['gid', 'baggage', 'adult_total_price']).size().reset_index(name='price_frequency')
- df_input = df_input.merge(price_freq, on=['gid', 'baggage', 'adult_total_price'], how='left')
- # 计算频率加权的百分位数
- def weighted_percentile(group):
- if len(group) == 0:
- return pd.Series([np.nan] * 4, index=['price_weighted_percentile_25',
- 'price_weighted_percentile_50',
- 'price_weighted_percentile_75',
- 'price_weighted_percentile_90'])
-
- # 按价格排序,计算累积频率
- group = group.sort_values('adult_total_price')
- group['cum_freq'] = group['price_frequency'].cumsum()
- total_freq = group['price_frequency'].sum()
-
- # 计算加权百分位数
- percentiles = []
- for p in [0.25, 0.5, 0.75, 0.9]:
- threshold = total_freq * p
- # 找到第一个累积频率超过阈值的价格
- mask = group['cum_freq'] >= threshold
- if mask.any():
- percentile_value = group.loc[mask.idxmax(), 'adult_total_price']
- else:
- percentile_value = group['adult_total_price'].max()
- percentiles.append(percentile_value)
-
- return pd.Series(percentiles, index=['price_weighted_percentile_25',
- 'price_weighted_percentile_50',
- 'price_weighted_percentile_75',
- 'price_weighted_percentile_90'])
-
- # 按gid和baggage分组计算加权百分位数
- weighted_percentiles = df_input.groupby(['gid', 'baggage']).apply(weighted_percentile).reset_index()
- df_input = df_input.merge(weighted_percentiles, on=['gid', 'baggage'], how='left')
- # 4. 结合绝对价格和频率的综合判断(改进版)
- freq_median = df_input.groupby(['gid', 'baggage'])['price_frequency'].transform('median')
- # 计算价格相对于90%百分位数的倍数,用于区分不同级别的高价
- df_input['price_relative_to_90p'] = df_input['adult_total_price'] / df_input['price_weighted_percentile_90']
- # 添加价格容忍度:避免相近价格被分到不同区间
- # 计算价格差异容忍度(使用各百分位数的1%作为容忍度阈值)
- # tolerance_90p = df_input['price_weighted_percentile_90'] * 0.01
- tolerance_75p = df_input['price_weighted_percentile_75'] * 0.01
- tolerance_50p = df_input['price_weighted_percentile_50'] * 0.01
- tolerance_25p = df_input['price_weighted_percentile_25'] * 0.01
- # 重新设计价格区间分类(确保无重叠):
- # 首先定义各个区间的mask
- # 4.1 异常高价:价格远高于90%百分位数(超过1.5倍)且频率极低(低于中位数的1/3)
- price_abnormal_high_mask = ((df_input['price_relative_to_90p'] > 1.5) &
- (df_input['price_frequency'] < freq_median * 0.33))
- # 4.2 真正高位:严格满足条件(价格 > 90%分位数 且 频率 < 中位数)
- price_real_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_90']) &
- (df_input['price_frequency'] < freq_median) &
- ~price_abnormal_high_mask)
- # 4.3 正常高位:使用容忍度(价格接近75%分位数)
- price_normal_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_75'] - tolerance_75p) &
- ~price_real_high_mask & ~price_abnormal_high_mask)
- # 4.4 中高价:使用容忍度(价格在50%-75%分位数之间)
- price_mid_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_50'] - tolerance_50p) &
- (df_input['adult_total_price'] <= df_input['price_weighted_percentile_75'] + tolerance_75p) &
- ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
- # 4.5 中低价:使用容忍度(价格在25%-50%分位数之间)
- price_mid_low_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_25'] - tolerance_25p) &
- (df_input['adult_total_price'] <= df_input['price_weighted_percentile_50'] + tolerance_50p) &
- ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
-
- # 4.6 低价:严格满足条件(价格 ≤ 25%分位数)
- price_low_mask = ((df_input['adult_total_price'] <= df_input['price_weighted_percentile_25']) &
- ~price_mid_low_mask & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask)
- # 使用np.select确保互斥性
- price_zone_masks = [
- price_abnormal_high_mask, # 异常高价区(5级)
- price_real_high_mask, # 真正高价区(4级)
- price_normal_high_mask, # 正常高价区(3级)
- price_mid_high_mask, # 中高价区(2级)
- price_mid_low_mask, # 中低价区(1级)
- price_low_mask, # 低价区(0级)
- ]
- price_zone_values = [5, 4, 3, 2, 1, 0] # 5:异常高价, 4:真正高价, 3:正常高价, 2:中高价, 1:中低价, 0:低价
- # 使用np.select确保每个价格只被分到一个区间
- price_zone_result = np.select(price_zone_masks, price_zone_values, default=2) # 默认中高价
- # 4.8 价格区间综合标记
- df_input['price_zone_comprehensive'] = price_zone_result
- # 5. 价格异常度检测
- # 价格相对于均值的标准化偏差
- df_input['price_z_score'] = (df_input['adult_total_price'] - df_input['mean_price']) / df_input['std_price']
-
- # 价格异常度:基于Z-score的绝对值
- df_input['price_anomaly_score'] = np.abs(df_input['price_z_score'])
-
- # 6. 价格稳定性特征
- # 计算价格波动系数(标准差/均值)
- df_input['price_coefficient_variation'] = df_input['std_price'] / df_input['mean_price']
- # 7. 价格趋势特征
- # 计算当前价格相对于历史价格的位置
- df_input['price_relative_position'] = (df_input['adult_total_price'] - df_input['min_price']) / (df_input['max_price'] - df_input['min_price'])
- df_input['price_relative_position'] = df_input['price_relative_position'].fillna(0.5) # 兜底
- # 删除中间计算列
- df_input.drop(columns=['price_frequency', 'price_z_score', 'price_relative_to_90p'], inplace=True, errors='ignore')
- del price_freq
- del price_stats
- del weighted_percentiles
- del freq_median
- print(">>> 改进版价格区间特征计算完成")
- # 生成第一机场对
- df_input['airport_pair_1'] = (
- df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
- )
- # 删除原始第一机场码
- df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
- # 第一机场对 放到 seg1_dep_time 列的前面
- insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time')
- # 生成第二机场对(带缺失兜底)
- df_input['airport_pair_2'] = np.where(
- df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
- 'NA',
- df_input['seg2_dep_air_port'].astype(str) + "-" +
- df_input['seg2_arr_air_port'].astype(str)
- )
- # 删除原始第二机场码
- df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
- # 第二机场对 放到 seg2_dep_time 列的前面
- insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time')
- # 是否转乘
- df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
- # 是否转乘 放到 flight_number_2 列的前面
- insert_df_col(df_input, 'is_transfer', 'flight_number_2')
- # 重命名起飞时刻与到达时刻
- df_input.rename(
- columns={
- 'seg1_dep_time': 'dep_time_1',
- 'seg1_arr_time': 'arr_time_1',
- 'seg2_dep_time': 'dep_time_2',
- 'seg2_arr_time': 'arr_time_2',
- },
- inplace=True
- )
-
- # 第一段飞行时长
- df_input['fly_duration_1'] = (
- (df_input['arr_time_1'] - df_input['dep_time_1'])
- .dt.total_seconds() / 3600
- ).round(2)
- # 第二段飞行时长(无转乘为 0)
- df_input['fly_duration_2'] = (
- (df_input['arr_time_2'] - df_input['dep_time_2'])
- .dt.total_seconds() / 3600
- ).fillna(0).round(2)
- # 总飞行时长
- df_input['fly_duration'] = (
- df_input['fly_duration_1'] + df_input['fly_duration_2']
- ).round(2)
- # 中转停留时长(无转乘为 0)
- df_input['stop_duration'] = (
- (df_input['dep_time_2'] - df_input['arr_time_1'])
- .dt.total_seconds() / 3600
- ).fillna(0).round(2)
- # 裁剪,防止负数
- # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
- # df_input[c] = df_input[c].clip(lower=0)
- # 和 is_transfer 逻辑保持一致
- # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
-
- # 一次性插到 is_filled 前面
- insert_before = 'is_filled'
- new_cols = [
- 'fly_duration_1',
- 'fly_duration_2',
- 'fly_duration',
- 'stop_duration'
- ]
- cols = df_input.columns.tolist()
- idx = cols.index(insert_before)
- # 删除旧位置
- cols = [c for c in cols if c not in new_cols]
- # 插入新位置(顺序保持)
- cols[idx:idx] = new_cols # python独有空切片插入法
- df_input = df_input[cols]
- # 一次生成多个字段
- dep_t1 = df_input['dep_time_1']
- # 几点起飞(0–23)
- df_input['flight_by_hour'] = dep_t1.dt.hour
- # 起飞日期几号(1–31)
- df_input['flight_by_day'] = dep_t1.dt.day
- # 起飞日期几月(1–12)
- df_input['flight_day_of_month'] = dep_t1.dt.month
- # 起飞日期周几(0=周一, 6=周日)
- df_input['flight_day_of_week'] = dep_t1.dt.weekday
- # 起飞日期季度(1–4)
- df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
- # 是否周末(周六 / 周日)
- df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
- # 找到对应的国家码
- df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
- df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
- # 整体出发时间 就是 dep_time_1
- df_input['global_dep_time'] = df_input['dep_time_1']
- # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
- df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
- # 出发日期在出发国家是否节假日
- df_input['dep_country_is_holiday'] = df_input.apply(
- lambda r: r['global_dep_time'].date()
- in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
- axis=1
- ).astype(int)
- # 到达日期在到达国家是否节假日
- df_input['arr_country_is_holiday'] = df_input.apply(
- lambda r: r['global_arr_time'].date()
- in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
- axis=1
- ).astype(int)
- # 在任一侧是否节假日
- df_input['any_country_is_holiday'] = (
- df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
- .max(axis=1)
- )
- # 是否跨国航线
- df_input['is_cross_country'] = (
- df_input['dep_country'] != df_input['arr_country']
- ).astype(int)
- def days_to_next_holiday(country, cur_date):
- if pd.isna(country) or pd.isna(cur_date):
- return np.nan
- holidays = COUNTRY_HOLIDAYS.get(country)
- if not holidays:
- return np.nan
- # 找未来(含当天)的节假日,并排序
- future_holidays = sorted([d for d in holidays if d >= cur_date])
- if not future_holidays:
- return np.nan
- next_holiday = future_holidays[0] # 第一个未来节假日
- delta_days = (next_holiday - cur_date).days
- return delta_days
- df_input['days_to_holiday'] = df_input.apply(
- lambda r: days_to_next_holiday(
- r['dep_country'],
- r['update_hour'].date()
- ),
- axis=1
- )
-
- # 没有未来节假日的统一兜底
- # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
- # days_to_holiday 插在 update_hour 前面
- insert_df_col(df_input, 'days_to_holiday', 'update_hour')
- # 训练模式
- if is_training:
- print(">>> 训练模式:计算 target 相关列")
- print(f"\n>>> 开始处理 对应区间: n_hours = {target_n_hours}")
- target_lower_limit = crop_lower_limit
- target_upper_limit = target_n_hours
- mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
- df_targets = df_input.loc[mask_targets].copy()
- targets_amout = df_targets.shape[0]
- print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
- if targets_amout == 0:
- print(f">>> n_hours = {target_n_hours} 无有效数据,跳过")
- return pd.DataFrame()
-
- print(">>> 计算 price_at_n_hours")
- df_input_object = df_input[(df_input['hours_until_departure'] >= feature_n_hours) & (df_input['baggage'] == 30)].copy()
- df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前36\32\30小时
-
- # 提取并重命名 price 列
- df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
- print(">>> price_at_n_hours计算完成,示例:")
- print(df_last_price_at_n_hours.head(5))
- # 新的计算降价方式
- # 先排序
- df_targets = df_targets.sort_values(
- ['gid', 'hours_until_departure'],
- ascending=[True, False]
- )
- # 在 gid 内计算价格变化
- g = df_targets.groupby('gid', group_keys=False)
- df_targets['price_diff'] = g['adult_total_price'].diff()
-
- # VALID_DROP_MIN = 5
- # LOWER_HOUR = 4
- # UPPER_HOUR = 28
- valid_drop_mask = (
- (df_targets['price_diff'] <= -VALID_DROP_MIN)
- # (df_targets['hours_until_departure'] >= LOWER_HOUR) &
- # (df_targets['hours_until_departure'] <= UPPER_HOUR)
- )
- # 有效的降价
- df_valid_drops = df_targets.loc[valid_drop_mask]
- # 找「第一次」降价(每个 gid)
- df_first_price_drop = (
- df_valid_drops
- .groupby('gid', as_index=False)
- .first()
- )
- # 简化列
- df_first_price_drop = df_first_price_drop[
- ['gid', 'hours_until_departure', 'adult_total_price', 'price_diff']
- ].rename(columns={
- 'hours_until_departure': 'time_to_price_drop',
- 'adult_total_price': 'price_at_d_hours',
- 'price_diff': 'amount_of_price_drop',
- })
- # 把降价幅度转成正数(更直观)
- df_first_price_drop['amount_of_price_drop'] = (-df_first_price_drop['amount_of_price_drop']).round(2)
- pass
- # # 计算降价信息
- # print(">>> 计算降价信息")
- # df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
- # df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
- # df_targets['price_dropped'] = (
- # (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
- # (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
- # )
- # df_price_drops = df_targets[df_targets['price_dropped']].copy()
- # price_drops_len = df_price_drops.shape[0]
- # if price_drops_len == 0:
- # print(f">>> n_hours = {current_n_hours} 无降价信息")
- # # 创建包含指定列的空 DataFrame
- # df_price_drop_info = pd.DataFrame({
- # 'gid': pd.Series(dtype='int64'),
- # 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
- # 'price_at_first_drop_hours': pd.Series(dtype='float64')
- # })
- # else:
- # df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
- # df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
- # 'hours_until_departure': 'first_drop_hours_until_departure',
- # 'adult_total_price': 'price_at_first_drop_hours'
- # })
- # print(">>> 降价信息计算完成,示例:")
- # print(df_price_drop_info.head(5))
-
- # # 合并信息
- # df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
- # df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
- # df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
- # df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
- # df_gid_info['time_to_price_drop'] = current_n_hours - df_gid_info['first_drop_hours_until_departure']
- # df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
- # del df_input_object
- # del df_last
- # del df_last_price_at_n_hours
- # del df_price_drops
- # del df_price_drop_info
- df_gid_info = df_last_price_at_n_hours.merge(df_first_price_drop, on='gid', how='left')
- df_gid_info['will_price_drop'] = df_gid_info['time_to_price_drop'].notnull().astype(int)
- df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0)
- df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0)
- pass
- del df_input_object
- del df_last
- del df_last_price_at_n_hours
- del df_first_price_drop
- del df_valid_drops
- del df_targets
- gc.collect()
-
- # 将目标变量合并到输入数据中
- print(">>> 将目标变量信息合并到 df_input")
- df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
- # 使用 0 填充 NaN 值
- df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
- ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
- df_input = df_input.rename(columns={
- 'will_price_drop': 'target_will_price_drop',
- 'amount_of_price_drop': 'target_amount_of_drop',
- 'time_to_price_drop': 'target_time_to_drop'
- })
-
- # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
- # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
- # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
- # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
- # gid_count = df_min_price_by_gid.shape[0]
- # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
- # # 将最小价格 merge 到 df_inputs 中
- # print(">>> 将最小价格 merge 到输入数据中...")
- # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
- print(">>> 合并后 df_input 样例:")
- print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
- # 预测模式
- else:
- print(">>> 预测模式:补齐 target 相关列(全部置 0)")
- df_input['target_will_price_drop'] = 0
- df_input['target_amount_of_drop'] = 0.0
- df_input['target_time_to_drop'] = 0
- # 按顺序排列
- order_columns = [
- "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day",
- "seats_remaining", "baggage", "baggage_level",
- "price_decrease_times_total", "price_decrease_times_consecutive", "price_last_decrease_hours",
- "price_increase_times_total", "price_increase_times_consecutive", "price_last_increase_hours",
- "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_amount_of_drop", "target_time_to_drop",
- "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "crawl_date", "gid",
- "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1",
- "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer",
- "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration",
- "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country",
- "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday",
- "price_weighted_percentile_25", "price_weighted_percentile_50", "price_weighted_percentile_75", "price_weighted_percentile_90",
- "price_zone_comprehensive", "price_relative_position",
- ]
- df_input = df_input[order_columns]
-
- return df_input
- def standardization(df, feature_scaler, target_scaler=None, is_training=True, is_val=False, feature_length=240):
- print(">>> 开始标准化处理")
- # 准备走标准化的特征
- scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration',
- 'price_weighted_percentile_25', 'price_weighted_percentile_50',
- 'price_weighted_percentile_75', 'price_weighted_percentile_90']
-
- if is_training:
- print(">>> 特征数据标准化开始")
- if feature_scaler is None:
- feature_scaler = StandardScaler()
- if not is_val:
- feature_scaler.fit(df[scaler_features])
- df[scaler_features] = feature_scaler.transform(df[scaler_features])
- print(">>> 特征数据标准化完成")
-
- else:
- df[scaler_features] = feature_scaler.transform(df[scaler_features])
- print(">>> 预测模式下特征标准化处理完成")
- # 准备走归一化的特征
- # 事先定义好每个特征的合理范围
- fixed_ranges = {
- 'hours_until_departure': (0, 480), # 0-20天
- 'from_city_num': (0, 38),
- 'to_city_num': (0, 38),
- 'flight_1_num': (0, 341),
- 'flight_2_num': (0, 341),
- 'seats_remaining': (1, 5),
- # 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次
- # 'price_last_change_hours': (0, 480),
- 'price_decrease_times_total': (0, 20), # 假设价格下降次数不会超过20次
- 'price_decrease_times_consecutive': (0, 10), # 假设价格连续下降次数不会超过10次
- 'price_last_decrease_hours': (0, feature_length), #(0-240小时)
- 'price_increase_times_total': (0, 20), # 假设价格上升次数不会超过20次
- 'price_increase_times_consecutive': (0, 10), # 假设价格连续上升次数不会超过10次
- 'price_last_increase_hours': (0, feature_length), #(0-240小时)
- 'price_zone_comprehensive': (0, 5),
- 'days_to_departure': (0, 30),
- 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天
- 'flight_by_hour': (0, 23),
- 'flight_by_day': (1, 31),
- 'flight_day_of_month': (1, 12),
- 'flight_day_of_week': (0, 6),
- 'flight_day_of_quarter': (1, 4),
- }
- normal_features = list(fixed_ranges.keys())
- print(">>> 归一化特征列: ", normal_features)
- print(">>> 基于固定范围的特征数据归一化开始")
- for col in normal_features:
- if col in df.columns:
- # 核心归一化公式: (x - min) / (max - min)
- col_min, col_max = fixed_ranges[col]
- df[col] = (df[col] - col_min) / (col_max - col_min)
- # 添加裁剪,将超出范围的值强制限制在[0,1]区间
- df[col] = df[col].clip(0, 1)
- print(">>> 基于固定范围的特征数据归一化完成")
- return df, feature_scaler, target_scaler
- def preprocess_data_simple(df_input, is_train=False):
- df_input = preprocess_data_first_half(df_input)
-
- # 在 gid 与 baggage 内按时间降序
- df_input = df_input.sort_values(
- by=['gid', 'baggage', 'hours_until_departure'],
- ascending=[True, True, False]
- ).reset_index(drop=True)
- df_input = df_input[df_input['hours_until_departure'] <= 480]
- df_input = df_input[df_input['baggage'] == 30]
- # 在hours_until_departure 的末尾 保留真实的而不是补齐的数据
- if not is_train:
- _tail_filled = df_input.groupby(['gid', 'baggage'])['is_filled'].transform(
- lambda s: s.iloc[::-1].cummin().iloc[::-1]
- )
- df_input = df_input[~((df_input['is_filled'] == 1) & (_tail_filled == 1))]
-
- # 计算价格变化量
- df_input['price_change_amount'] = (
- df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
- .apply(lambda s: s.diff().replace(0, np.nan).ffill().fillna(0)).round(2)
- )
- # 计算价格变化百分比(相对于上一时间点的变化率)
- df_input['price_change_percent'] = (
- df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price']
- .apply(lambda s: s.pct_change().replace(0, np.nan).ffill().fillna(0)).round(4)
- )
- # 第一步:标记价格变化段
- df_input['price_change_segment'] = (
- df_input.groupby(['gid', 'baggage'], group_keys=False)['price_change_amount']
- .apply(lambda s: (s != s.shift()).cumsum())
- )
- # 第二步:计算每个变化段内的持续时间
- df_input['price_duration_hours'] = (
- df_input.groupby(['gid', 'baggage', 'price_change_segment'], group_keys=False)
- .cumcount()
- .add(1)
- )
-
- # 可选:删除临时列
- df_input = df_input.drop(columns=['price_change_segment'])
- # 仅在价格变化点记录余票变化量;其它非价格变化点置空(NaN)
- # _price_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'].diff()
- # _price_changed = _price_diff.notna() & _price_diff.ne(0)
- # _seats_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining'].diff()
- # df_input['seats_remaining_change_amount'] = _seats_diff.where(_price_changed).round(0)
- # # 前向填充 并 填充缺失值为0
- # df_input['seats_remaining_change_amount'] = (
- # df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining_change_amount']
- # .ffill()
- # .fillna(0)
- # )
-
- adult_price = df_input.pop('Adult_Total_Price')
- hours_until = df_input.pop('Hours_Until_Departure')
- df_input['Adult_Total_Price'] = adult_price
- df_input['Hours_Until_Departure'] = hours_until
- df_input['Baggage'] = df_input['baggage']
- # 训练过程
- if is_train:
- df_target = df_input[(df_input['hours_until_departure'] >= 12) & (df_input['hours_until_departure'] <= 60)].copy()
- df_target = df_target.sort_values(
- by=['gid', 'hours_until_departure'],
- ascending=[True, False]
- ).reset_index(drop=True)
- # 对于先升后降的分析
- prev_pct = df_target.groupby('gid', group_keys=False)['price_change_percent'].shift(1)
- prev_amo = df_target.groupby('gid', group_keys=False)['price_change_amount'].shift(1)
- prev_dur = df_target.groupby('gid', group_keys=False)['price_duration_hours'].shift(1)
- # prev_seats_amo = df_target.groupby('gid', group_keys=False)['seats_remaining_change_amount'].shift(1)
- prev_price = df_target.groupby('gid', group_keys=False)['adult_total_price'].shift(1)
- prev_seats = df_target.groupby('gid', group_keys=False)['seats_remaining'].shift(1)
- drop_mask = (prev_pct > 0) & (df_target['price_change_percent'] < 0)
-
- df_drop_nodes = df_target.loc[drop_mask, ['gid', 'hours_until_departure']].copy()
- df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
- df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
- df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
- df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
- df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
- df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
- # df_drop_nodes['high_price_seats_remaining_change_amount'] = prev_seats_amo.loc[drop_mask].astype(float).round(1).to_numpy()
- df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy()
- df_drop_nodes['high_price_seats_remaining'] = prev_seats.loc[drop_mask].astype(int).to_numpy()
- df_drop_nodes = df_drop_nodes.reset_index(drop=True)
- flight_info_cols = [
- 'city_pair',
- 'flight_number_1', 'seg1_dep_air_port', 'seg1_dep_time', 'seg1_arr_air_port', 'seg1_arr_time',
- 'flight_number_2', 'seg2_dep_air_port', 'seg2_dep_time', 'seg2_arr_air_port', 'seg2_arr_time',
- 'currency', 'baggage', 'flight_day',
- ]
- flight_info_cols = [c for c in flight_info_cols if c in df_target.columns]
-
- df_gid_info = df_target[['gid'] + flight_info_cols].drop_duplicates(subset=['gid']).reset_index(drop=True)
- df_drop_nodes = df_drop_nodes.merge(df_gid_info, on='gid', how='left')
- drop_info_cols = ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
- 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount',
- 'high_price_amount', 'high_price_seats_remaining',
- ]
- # 按顺序排列 去掉gid
- df_drop_nodes = df_drop_nodes[flight_info_cols + drop_info_cols]
- df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_price_change_percent'] <= -0.01] # 太低的降幅不计
- # 对于没有先升后降的gid进行分析
- gids_with_drop = df_target.loc[drop_mask, 'gid'].unique()
- df_no_drop = df_target[~df_target['gid'].isin(gids_with_drop)].copy()
- keep_info_cols = [
- 'keep_hours_until_departure', 'keep_price_change_percent', 'keep_price_change_amount',
- 'keep_price_duration_hours', 'keep_price_amount', 'keep_price_seats_remaining',
- ]
-
- if df_no_drop.empty:
- df_keep_nodes = pd.DataFrame(columns=flight_info_cols + keep_info_cols)
- else:
- df_no_drop = df_no_drop.sort_values(
- by=['gid', 'hours_until_departure'],
- ascending=[True, False]
- ).reset_index(drop=True)
- df_no_drop['keep_segment'] = df_no_drop.groupby('gid')['price_change_percent'].transform(
- lambda s: (s != s.shift()).cumsum()
- )
- df_keep_row = (
- df_no_drop.groupby(['gid', 'keep_segment'], as_index=False)
- .tail(1)
- .reset_index(drop=True)
- )
- df_keep_nodes = df_keep_row[
- ['gid', 'hours_until_departure', 'price_change_percent', 'price_change_amount',
- 'price_duration_hours', 'adult_total_price', 'seats_remaining']
- ].copy()
- df_keep_nodes.rename(
- columns={
- 'hours_until_departure': 'keep_hours_until_departure',
- 'price_change_percent': 'keep_price_change_percent',
- 'price_change_amount': 'keep_price_change_amount',
- 'price_duration_hours': 'keep_price_duration_hours',
- 'adult_total_price': 'keep_price_amount',
- 'seats_remaining': 'keep_price_seats_remaining',
- },
- inplace=True,
- )
- df_keep_nodes = df_keep_nodes.merge(df_gid_info, on='gid', how='left')
- df_keep_nodes = df_keep_nodes[flight_info_cols + keep_info_cols]
- del df_keep_row
-
- del df_gid_info
- del df_target
- del df_no_drop
- return df_input, df_drop_nodes, df_keep_nodes
- return df_input, None, None
- def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".", pred_time_str=""):
- if df_input is None or df_input.empty:
- return pd.DataFrame()
-
- df_sorted = df_input.sort_values(
- by=['gid', 'hours_until_departure'],
- ascending=[True, False],
- ).reset_index(drop=True)
- df_sorted = df_sorted[
- df_sorted['hours_until_departure'].between(12, 60)
- ].reset_index(drop=True)
- # 每个 gid 取 hours_until_departure 最小的一条
- df_min_hours = (
- df_sorted.drop_duplicates(subset=['gid'], keep='last')
- .reset_index(drop=True)
- )
- # 确保 hours_until_departure 在 [12, 60] 的 范围内
- # df_min_hours = df_min_hours[
- # df_min_hours['hours_until_departure'].between(12, 60)
- # ].reset_index(drop=True)
- drop_info_csv_path = os.path.join(output_dir, f'{group_route_str}_drop_info.csv')
- if os.path.exists(drop_info_csv_path):
- df_drop_nodes = pd.read_csv(drop_info_csv_path)
- else:
- df_drop_nodes = pd.DataFrame()
- keep_info_csv_path = os.path.join(output_dir, f'{group_route_str}_keep_info.csv')
- if os.path.exists(keep_info_csv_path):
- df_keep_nodes = pd.read_csv(keep_info_csv_path)
- else:
- df_keep_nodes = pd.DataFrame()
- df_min_hours['simple_will_price_drop'] = 0
- df_min_hours['simple_drop_in_hours'] = 0
- df_min_hours['simple_drop_in_hours_prob'] = 0.0
- df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知
-
- # 这个阈值取多少?
- pct_threshold = 0.001
- # pct_threshold = 2
- pct_threshold_1 = 0.001
- pct_threshold_c = 0.001
- for idx, row in df_min_hours.iterrows():
- city_pair = row['city_pair']
- flight_number_1 = row['flight_number_1']
- flight_number_2 = row['flight_number_2']
- if flight_number_1 == 'VJ878': # 调试时用
- pass
- price_change_percent = row['price_change_percent']
- price_change_amount = row['price_change_amount']
- price_duration_hours = row['price_duration_hours']
- hours_until_departure = row['hours_until_departure']
- # seats_remaining_change_amount = row['seats_remaining_change_amount']
- price_amount = row['adult_total_price']
- seats_remaining = row['seats_remaining']
- length_drop = 0
- length_keep = 0
- # 针对历史上发生的 高价->低价
- if not df_drop_nodes.empty:
- # 对准航班号, 不同起飞日期
- if flight_number_2 and flight_number_2 != 'VJ':
- df_drop_nodes_part = df_drop_nodes[
- (df_drop_nodes['city_pair'] == city_pair) &
- (df_drop_nodes['flight_number_1'] == flight_number_1) &
- (df_drop_nodes['flight_number_2'] == flight_number_2)
- ]
- else:
- df_drop_nodes_part = df_drop_nodes[
- (df_drop_nodes['city_pair'] == city_pair) &
- (df_drop_nodes['flight_number_1'] == flight_number_1)
- ]
-
- # 降价前 增幅阈值的匹配 与 高价历史持续时间 得出降价时间的概率
- if not df_drop_nodes_part.empty and pd.notna(price_change_percent):
- # 增幅太小的去掉
- df_drop_nodes_part = df_drop_nodes_part[df_drop_nodes_part['high_price_change_percent'] >= 0.01]
- # pct_diff = (df_drop_nodes_part['high_price_change_percent'] - float(price_change_percent)).abs()
- # df_match = df_drop_nodes_part.loc[pct_diff <= pct_threshold, ['high_price_duration_hours', 'high_price_change_percent']].copy()
-
- pct_base = float(price_change_percent)
- pct_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_percent'], errors='coerce')
- df_drop_gap = df_drop_nodes_part.loc[
- pct_vals.notna(),
- ['drop_hours_until_departure', 'high_price_duration_hours', 'high_price_change_percent',
- 'high_price_change_amount', 'high_price_amount', 'high_price_seats_remaining']
- ].copy()
- df_drop_gap['pct_gap'] = (pct_vals.loc[pct_vals.notna()] - pct_base)
- df_drop_gap['pct_abs_gap'] = df_drop_gap['pct_gap'].abs()
- price_base = pd.to_numeric(price_amount, errors='coerce')
- high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce')
- df_drop_gap['price_gap'] = high_price_vals - price_base
- df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
- df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
- df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy()
- # 历史上出现的极近似的增长幅度后的降价场景
- if not df_match.empty:
- dur_base = pd.to_numeric(price_duration_hours, errors='coerce')
- hud_base = pd.to_numeric(hours_until_departure, errors='coerce')
- # seats_base = pd.to_numeric(seats_remaining_change_amount, errors='coerce')
- if pd.notna(dur_base) and pd.notna(hud_base): # and pd.notna(seats_base)
- df_match_chk = df_match.copy()
- dur_vals = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce')
- df_match_chk = df_match_chk.loc[dur_vals.notna()].copy()
- df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 12].copy()
- drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce')
- df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy()
- df_match_chk = df_match_chk.loc[(drop_hud_vals.loc[drop_hud_vals.notna()] - float(hud_base)).abs() <= 12].copy()
- # seats_vals = pd.to_numeric(df_match_chk['high_price_seats_remaining_change_amount'], errors='coerce')
- # df_match_chk = df_match_chk.loc[seats_vals.notna()].copy()
- # df_match_chk = df_match_chk.loc[seats_vals.loc[seats_vals.notna()] == float(seats_base)].copy()
- # 持续时间、距离起飞时间、座位变化都匹配上
- if not df_match_chk.empty:
- remaining_hours = (
- pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base)
- ).clip(lower=0)
- remaining_hours = remaining_hours.round().astype(int)
- counts = remaining_hours.value_counts().sort_index()
- probs = (counts / counts.sum()).round(4)
- top_hours = int(probs.idxmax())
- top_prob = float(probs.max())
- dist_items = list(zip(probs.index.tolist(), probs.tolist()))
- dist_items = dist_items[:10]
- dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items])
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1
- df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str
- length_drop = df_match_chk.shape[0]
- # continue # 已经判定降价 后面不再做
-
- # 历史上未出现的极近似的增长幅度后的降价场景
- else:
- pass
- # if pd.notna(price_duration_hours) and price_change_percent >= 0.1:
- # pct_vals = pd.to_numeric(
- # df_drop_nodes_part['high_price_change_percent'],
- # errors='coerce'
- # ).replace([np.inf, -np.inf], np.nan).dropna()
- # dur_vals = pd.to_numeric(
- # df_drop_nodes_part['high_price_duration_hours'],
- # errors='coerce'
- # ).replace([np.inf, -np.inf], np.nan).dropna()
- # if not pct_vals.empty and not dur_vals.empty:
- # pct_min = float(pct_vals.min())
- # pct_max = float(pct_vals.max())
- # dur_min = float(dur_vals.min())
- # dur_max = float(dur_vals.max())
- # if (pct_min <= float(price_change_percent) <= pct_max) and (dur_min <= float(price_duration_hours) <= dur_max):
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.5'
- # continue # 已经判定降价 后面不再做
- # elif (pct_min <= float(price_change_percent)) and (dur_min <= float(price_duration_hours)):
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.3
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.3'
- # continue # 已经判定降价 后面不再做
-
- # 针对历史上发生 一直低价、一直高价、低价->高价、连续低价 等
- if not df_keep_nodes.empty:
- # 对准航班号, 不同起飞日期
- if flight_number_2 and flight_number_2 != 'VJ':
- df_keep_nodes_part = df_keep_nodes[
- (df_keep_nodes['city_pair'] == city_pair) &
- (df_keep_nodes['flight_number_1'] == flight_number_1) &
- (df_keep_nodes['flight_number_2'] == flight_number_2)
- ]
- else:
- df_keep_nodes_part = df_keep_nodes[
- (df_keep_nodes['city_pair'] == city_pair) &
- (df_keep_nodes['flight_number_1'] == flight_number_1)
- ]
- if not df_keep_nodes_part.empty and pd.notna(price_change_percent):
- # pct_vals_1 = df_keep_nodes_part['keep_price_change_percent'].replace([np.inf, -np.inf], np.nan).dropna()
- # # 保留百分位 10% ~ 90% 之间的 数据
- # if not pct_vals_1.empty:
- # q10_1 = float(pct_vals_1.quantile(0.10))
- # q90_1 = float(pct_vals_1.quantile(0.90))
- # df_keep_nodes_part = df_keep_nodes_part[
- # df_keep_nodes_part['keep_price_change_percent'].between(q10_1, q90_1)
- # ]
- # if df_keep_nodes_part.empty:
- # continue
-
- # 特殊判定场景
- # if price_change_percent < 0:
- # df_tmp = df_keep_nodes_part.copy()
- # # 确保组内顺序正确(如果前面已经排过,这行可省略)
- # df_tmp = df_tmp.sort_values(
- # by=["flight_day", "keep_hours_until_departure"],
- # ascending=[True, False]
- # )
- # # 是否为负值
- # df_tmp["is_negative"] = df_tmp["keep_price_change_percent"] < 0
-
- # if df_tmp["is_negative"].any():
- # # 标记“负值段”的开始
- # # 当 is_negative 为 True 且 前一行不是负值时,认为是一个新段
- # df_tmp["neg_block_id"] = (
- # df_tmp["is_negative"]
- # & ~df_tmp.groupby("flight_day")["is_negative"].shift(fill_value=False)
- # ).groupby(df_tmp["flight_day"]).cumsum()
- # # 在每个负值段内计数(第几个负值)
- # df_tmp["neg_rank_in_block"] = (
- # df_tmp.groupby(["flight_day", "neg_block_id"])
- # .cumcount() + 1
- # )
- # # 每个连续负值段的长度
- # df_tmp["neg_block_size"] = (
- # df_tmp.groupby(["flight_day", "neg_block_id"])["is_negative"]
- # .transform("sum")
- # )
- # # 只保留:
- # # 1) 是负值
- # # 2) 且不是该连续负值段的最后一个
- # df_continuous_price_drop = df_tmp[
- # (df_tmp["is_negative"]) &
- # (df_tmp["neg_rank_in_block"] < df_tmp["neg_block_size"])
- # ].drop(
- # columns=[
- # "is_negative",
- # "neg_block_id",
- # "neg_rank_in_block",
- # "neg_block_size",
- # ]
- # )
- # pct_diff_c = (df_continuous_price_drop['keep_price_change_percent'] - float(price_change_percent)).abs()
- # df_match_c = df_continuous_price_drop.loc[pct_diff_c <= pct_threshold_c, ['flight_day', 'keep_hours_until_departure', 'keep_price_duration_hours', 'keep_price_change_percent']].copy()
- # # 符合连续降价条件
- # if not df_match_c.empty and pd.notna(price_duration_hours):
- # vals_c = df_match_c['keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
- # if not vals_c.empty:
- # min_val_c = vals_c.min()
- # if min_val_c <= float(price_duration_hours):
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'c1'
- # length_drop = df_match_c.shape[0]
- # # continue # 已经判定降价 后面不再做
- # 一般判定场景
- pct_base_1 = float(price_change_percent)
- pct_vals_1 = pd.to_numeric(df_keep_nodes_part['keep_price_change_percent'], errors='coerce')
- df_keep_gap_1 = df_keep_nodes_part.loc[
- pct_vals_1.notna(),
- ['keep_hours_until_departure', 'keep_price_duration_hours', 'keep_price_change_percent',
- 'keep_price_change_amount', 'keep_price_amount', 'keep_price_seats_remaining']
- ].copy()
- df_keep_gap_1['pct_gap'] = (pct_vals_1.loc[pct_vals_1.notna()] - pct_base_1)
- df_keep_gap_1['pct_abs_gap'] = df_keep_gap_1['pct_gap'].abs()
-
- price_base_1 = pd.to_numeric(price_amount, errors='coerce')
- keep_price_vals_1 = pd.to_numeric(df_keep_gap_1['keep_price_amount'], errors='coerce')
- df_keep_gap_1['price_gap'] = keep_price_vals_1 - price_base_1
- df_keep_gap_1['price_abs_gap'] = df_keep_gap_1['price_gap'].abs()
- df_keep_gap_1 = df_keep_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
- df_match_1 = df_keep_gap_1.loc[(df_keep_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_keep_gap_1['price_abs_gap'] <= 10.0)].copy()
- # 历史上出现过近似变化幅度后保持低价场景
- if not df_match_1.empty:
- df_match_1['hours_delta'] = hours_until_departure - df_match_1['keep_hours_until_departure']
- df_match_1['modify_keep_price_duration_hours'] = df_match_1['keep_price_duration_hours'] - df_match_1['hours_delta']
- # df_match_1 = df_match_1[df_match_1['modify_keep_price_duration_hours'] > 0]
- dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce')
- # hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce')
- # seats_base_1 = pd.to_numeric(seats_remaining_change_amount, errors='coerce')
- if pd.notna(dur_base_1): # and pd.notna(seats_base_1)
- df_match_chk_1 = df_match_1.copy()
- dur_vals_1 = pd.to_numeric(df_match_chk_1['modify_keep_price_duration_hours'], errors='coerce')
- df_match_chk_1 = df_match_chk_1.loc[dur_vals_1.notna()].copy()
- df_match_chk_1 = df_match_chk_1.loc[(dur_vals_1.loc[dur_vals_1.notna()] - float(dur_base_1)).abs() <= 12].copy()
- # drop_hud_vals_1 = pd.to_numeric(df_match_chk_1['keep_hours_until_departure'], errors='coerce')
- # df_match_chk_1 = df_match_chk_1.loc[drop_hud_vals_1.notna()].copy()
- # df_match_chk_1 = df_match_chk_1.loc[(drop_hud_vals_1.loc[drop_hud_vals_1.notna()] - float(hud_base_1)).abs() <= 12].copy()
- # seats_vals_1 = pd.to_numeric(df_match_chk_1['keep_seats_remaining_change_amount'], errors='coerce')
- # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.notna()].copy()
- # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.loc[seats_vals_1.notna()] == float(seats_base_1)].copy()
- # 持续时间、距离起飞时间、座位变化都匹配上
- if not df_match_chk_1.empty:
- length_keep = df_match_chk_1.shape[0]
- # 可以明确的判定不降价
- if length_drop == 0:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
- df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k0'
- # 分歧判定
- else:
- drop_prob = round(length_drop / (length_keep + length_drop), 2)
- # 依旧保持之前的降价判定,概率修改
- if drop_prob >= 0.45:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1'
- # 改判不降价,概率修改
- else:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
- df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob
-
- # elif length_keep == length_drop: # 不降价与降价相同, 取0.5概率
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
-
- # df_match_1['hours_delta'] = hours_until_departure - df_match_1['keep_hours_until_departure']
- # df_match_1['modify_keep_price_duration_hours'] = df_match_1['keep_price_duration_hours'] - df_match_1['hours_delta']
- # df_match_1 = df_match_1[df_match_1['modify_keep_price_duration_hours'] > 0]
- # 比较 price_duration_hours 在 modify_keep_price_duration_hours 的百分位
- # vals = df_match_1['modify_keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
- # if not vals.empty:
- # # q10_11 = float(vals.quantile(0.10))
- # min_val = vals.min()
- # if min_val <= float(price_duration_hours):
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
- # 历史上未出现过近似变化幅度后保持低价场景
- else:
- pass
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n0'
- # if pd.notna(price_duration_hours) and price_change_percent <= 0.1:
- # df_keep_nodes_part_1 = df_keep_nodes_part[df_keep_nodes_part['keep_price_change_percent'] <= 0.1]
- # pct_vals_1 = pd.to_numeric(
- # df_keep_nodes_part_1['keep_price_change_percent'],
- # errors='coerce'
- # ).replace([np.inf, -np.inf], np.nan).dropna()
- # dur_vals_1 = pd.to_numeric(
- # df_keep_nodes_part_1['keep_price_duration_hours'],
- # errors='coerce'
- # ).replace([np.inf, -np.inf], np.nan).dropna()
- # if not pct_vals_1.empty and not dur_vals_1.empty:
- # pct_min_1 = float(pct_vals_1.min())
- # pct_max_1 = float(pct_vals_1.max())
- # dur_min_1 = float(dur_vals_1.min())
- # dur_max_1 = float(dur_vals_1.max())
- # if (pct_min_1 <= float(price_change_percent) <= pct_max_1) and (dur_min_1 <= float(price_duration_hours) <= dur_max_1):
- # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n1'
- pass
- print("判定过程结束")
- df_min_hours = df_min_hours.rename(columns={'seg1_dep_time': 'from_time'})
- _pred_dt = pd.to_datetime(str(pred_time_str), format="%Y%m%d%H%M", errors="coerce")
- df_min_hours["update_hour"] = _pred_dt.strftime("%Y-%m-%d %H:%M:%S")
- _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h")
- df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(60, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
- df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(12, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
- order_cols = ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2', 'from_time',
- 'baggage', 'seats_remaining', 'currency',
- 'adult_total_price', 'hours_until_departure', 'price_change_percent', 'price_duration_hours',
- 'update_hour', 'crawl_date',
- 'valid_begin_hour', 'valid_end_hour',
- 'simple_will_price_drop', 'simple_drop_in_hours', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist'
- ]
- df_predict = df_min_hours[order_cols]
- df_predict = df_predict.rename(columns={
- 'simple_will_price_drop': 'will_price_drop',
- 'simple_drop_in_hours': 'drop_in_hours',
- 'simple_drop_in_hours_prob': 'drop_in_hours_prob',
- 'simple_drop_in_hours_dist': 'drop_in_hours_dist',
- }
- )
- # 排序
- df_predict = df_predict.sort_values(
- by=['city_pair', 'flight_number_1', 'flight_number_2', 'flight_day'],
- kind='mergesort',
- na_position='last',
- ).reset_index(drop=True)
- # 时间段过滤 过久没更新的(超过8小时)可能是已售完 不参与预测
- update_dt = pd.to_datetime(df_predict["update_hour"], errors="coerce")
- crawl_dt = pd.to_datetime(df_predict["crawl_date"], errors="coerce")
- dt_diff = update_dt - crawl_dt
- df_predict = df_predict.loc[
- (dt_diff >= pd.Timedelta(0)) & (dt_diff <= pd.Timedelta(hours=8))
- ].reset_index(drop=True)
- print("更新时间过滤")
- csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv')
- df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig')
- print("预测结果已追加")
- return df_predict
|