import pandas as pd import numpy as np import bisect import gc import os from datetime import datetime, timedelta from sklearn.preprocessing import StandardScaler from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays from utils import insert_df_col COUNTRY_HOLIDAYS = build_country_holidays(city_to_country) def preprocess_data_cycle(df_input, interval_hours=8, feature_length=240, target_length=24, is_training=True): # df_input_part = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['hours_until_departure'] < current_n_hours)].copy() df_input = preprocess_data_first_half(df_input) # 创建一个空列表来存储所有处理后的数据部分 list_df_parts = [] crop_lower_limit_list = [4] # [4, 28, 52, 76, 100] for crop_lower_limit in crop_lower_limit_list: target_n_hours = crop_lower_limit + target_length feature_n_hours = target_n_hours + interval_hours crop_upper_limit = feature_n_hours + feature_length df_input_part = preprocess_data(df_input, is_training=is_training, crop_upper_limit=crop_upper_limit, feature_n_hours=feature_n_hours, target_n_hours=target_n_hours, crop_lower_limit=crop_lower_limit) # 将处理后的部分添加到列表中 list_df_parts.append(df_input_part) if not is_training: break # 合并所有处理后的数据部分 if list_df_parts: df_combined = pd.concat(list_df_parts, ignore_index=True) return df_combined else: return pd.DataFrame() # 如果没有数据,返回空DataFrame def preprocess_data_first_half(df_input): '''前半部分''' print(">>> 开始数据预处理") # 生成 城市对 df_input['city_pair'] = ( df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str) ) # 城市码映射成数字 df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map) df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map) missing_from = ( df_input.loc[df_input['from_city_num'].isna(), 'from_city_code'] .unique() ) missing_to = ( df_input.loc[df_input['to_city_num'].isna(), 'to_city_code'] .unique() ) if missing_from: print("未映射的 from_city:", missing_from) if missing_to: print("未映射的 to_city:", missing_to) # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列 cols = df_input.columns.tolist() # 删除已存在的几列(保证顺序正确) for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']: cols.remove(c) # 这几列插入到最前面 df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols] pass # 转格式 df_input['search_dep_time'] = pd.to_datetime( df_input['search_dep_time'], format='%Y%m%d', errors='coerce' ).dt.strftime('%Y-%m-%d') # 重命名起飞日期 df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True) # 重命名航班号 df_input.rename( columns={ 'seg1_flight_number': 'flight_number_1', 'seg2_flight_number': 'flight_number_2' }, inplace=True ) # 分开填充 df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ') df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ') # 航班号转数字 df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map) df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map) missing_flight_1 = ( df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1'] .unique() ) missing_flight_2 = ( df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2'] .unique() ) if missing_flight_1: print("未映射的 flight_1:", missing_flight_1) if missing_flight_2: print("未映射的 flight_2:", missing_flight_2) # flight_1_num 放在 seg1_dep_air_port 之前 insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port') # flight_2_num 放在 seg2_dep_air_port 之前 insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port') df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0 # baggage_level 放在 flight_number_2 之前 insert_df_col(df_input, 'baggage_level', 'flight_number_2') df_input['Adult_Total_Price'] = df_input['adult_total_price'] # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值 insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining') df_input['Hours_Until_Departure'] = df_input['hours_until_departure'] # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值 insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure') pass # gid:基于指定字段的分组标记(整数) df_input['gid'] = ( df_input .groupby( ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组 sort=False ) .ngroup() ) return df_input def preprocess_data(df_input, is_training=True, crop_upper_limit=480, feature_n_hours=36, target_n_hours=28, crop_lower_limit=4): print(f"裁剪范围: [{crop_lower_limit}, {crop_upper_limit}], 间隔窗口: [{target_n_hours}, {feature_n_hours}]") # 做一下时间段裁剪, 保留起飞前480小时之内且大于等于4小时的 df_input = df_input[(df_input['hours_until_departure'] < crop_upper_limit) & (df_input['hours_until_departure'] >= crop_lower_limit)].reset_index(drop=True) # 在 gid 与 baggage 内按时间降序 df_input = df_input.sort_values( by=['gid', 'baggage', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) # 价格幅度阈值 VALID_DROP_MIN = 5 # 价格变化掩码 g = df_input.groupby(['gid', 'baggage']) diff = g['adult_total_price'].transform('diff') # change_mask = diff.abs() >= VALID_DROP_MIN # 变化太小的不计入 decrease_mask = diff <= -VALID_DROP_MIN # 降价(变化太小的不计入) increase_mask = diff >= VALID_DROP_MIN # 升价(变化太小的不计入) df_input['_price_event_dir'] = np.where(increase_mask, 1, np.where(decrease_mask, -1, 0)) # 计算连续升价/降价次数 def _calc_price_streaks(df_group): dirs = df_group['_price_event_dir'].to_numpy() n = len(dirs) inc = np.full(n, np.nan) dec = np.full(n, np.nan) last_dir = 0 inc_cnt = 0 dec_cnt = 0 for i, d in enumerate(dirs): if d == 1: inc_cnt = inc_cnt + 1 if last_dir == 1 else 1 dec_cnt = 0 last_dir = 1 inc[i] = inc_cnt dec[i] = dec_cnt elif d == -1: dec_cnt = dec_cnt + 1 if last_dir == -1 else 1 inc_cnt = 0 last_dir = -1 inc[i] = inc_cnt dec[i] = dec_cnt inc_s = pd.Series(inc, index=df_group.index).ffill().fillna(0).astype(int) dec_s = pd.Series(dec, index=df_group.index).ffill().fillna(0).astype(int) return pd.DataFrame( { 'price_increase_times_consecutive': inc_s, 'price_decrease_times_consecutive': dec_s, }, index=df_group.index, ) streak_df = df_input.groupby(['gid', 'baggage'], sort=False, group_keys=False).apply(_calc_price_streaks) df_input = df_input.join(streak_df) df_input.drop(columns=['_price_event_dir'], inplace=True) # 价格变化次数 # df_input['price_change_times_total'] = ( # change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum() # ) # 价格下降次数 df_input['price_decrease_times_total'] = ( decrease_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum() ) # 价格上升次数 df_input['price_increase_times_total'] = ( increase_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum() ) # 上次发生变价的小时数 # last_change_hour = ( # df_input['hours_until_departure'] # .where(change_mask) # .groupby([df_input['gid'], df_input['baggage']]) # .ffill() # 前向填充 # ) # 上次发生降价的小时数 last_decrease_hour = ( df_input['hours_until_departure'] .where(decrease_mask) .groupby([df_input['gid'], df_input['baggage']]) .ffill() # 前向填充 ) # 上次发生升价的小时数 last_increase_hour = ( df_input['hours_until_departure'] .where(increase_mask) .groupby([df_input['gid'], df_input['baggage']]) .ffill() # 前向填充 ) # 当前距离上一次变价过去多少小时 # df_input['price_last_change_hours'] = ( # last_change_hour - df_input['hours_until_departure'] # ).fillna(0) # 当前距离上一次降价过去多少小时 df_input['price_last_decrease_hours'] = ( last_decrease_hour - df_input['hours_until_departure'] ).fillna(0) # 当前距离上一次升价过去多少小时 df_input['price_last_increase_hours'] = ( last_increase_hour - df_input['hours_until_departure'] ).fillna(0) pass # 想插入到 seats_remaining 前面的新列 new_cols = [ # 'price_change_times_total', # 'price_last_change_hours', 'price_decrease_times_total', 'price_decrease_times_consecutive', 'price_last_decrease_hours', 'price_increase_times_total', 'price_increase_times_consecutive', 'price_last_increase_hours', ] # 当前所有列 cols = df_input.columns.tolist() # 找到 seats_remaining 的位置 idx = cols.index('seats_remaining') # 重新拼列顺序 new_order = cols[:idx] + new_cols + cols[idx:] # 去重(防止列已经在原位置) new_order = list(dict.fromkeys(new_order)) # 重新排列 DataFrame df_input = df_input[new_order] pass print(">>> 计算价格区间特征") # 1. 基于绝对价格水平的价格区间划分 # 先计算每个(gid, baggage)的价格统计特征 # g = df_input.groupby(['gid', 'baggage']) price_stats = df_input.groupby(['gid', 'baggage'])['adult_total_price'].agg( min_price='min', max_price='max', mean_price='mean', std_price='std' ).reset_index() # 合并统计特征到原数据 df_input = df_input.merge(price_stats, on=['gid', 'baggage'], how='left') # 2. 基于绝对价格的价格区间划分 (可以删除,因为后面有更精细的基于频率加权的分类) # # 高价区间:超过均值+1倍标准差 # df_input['price_absolute_high'] = (df_input['adult_total_price'] > # (df_input['mean_price'] + df_input['std_price'])).astype(int) # # 中高价区间:均值到均值+1倍标准差 # df_input['price_absolute_mid_high'] = ((df_input['adult_total_price'] > df_input['mean_price']) & # (df_input['adult_total_price'] <= (df_input['mean_price'] + df_input['std_price']))).astype(int) # # 中低价区间:均值-1倍标准差到均值 # df_input['price_absolute_mid_low'] = ((df_input['adult_total_price'] > (df_input['mean_price'] - df_input['std_price'])) & # (df_input['adult_total_price'] <= df_input['mean_price'])).astype(int) # # 低价区间:低于均值-1倍标准差 # df_input['price_absolute_low'] = (df_input['adult_total_price'] <= (df_input['mean_price'] - df_input['std_price'])).astype(int) # 3. 基于频率加权的价格百分位数(改进版) # 计算每个价格出现的频率 price_freq = df_input.groupby(['gid', 'baggage', 'adult_total_price']).size().reset_index(name='price_frequency') df_input = df_input.merge(price_freq, on=['gid', 'baggage', 'adult_total_price'], how='left') # 计算频率加权的百分位数 def weighted_percentile(group): if len(group) == 0: return pd.Series([np.nan] * 4, index=['price_weighted_percentile_25', 'price_weighted_percentile_50', 'price_weighted_percentile_75', 'price_weighted_percentile_90']) # 按价格排序,计算累积频率 group = group.sort_values('adult_total_price') group['cum_freq'] = group['price_frequency'].cumsum() total_freq = group['price_frequency'].sum() # 计算加权百分位数 percentiles = [] for p in [0.25, 0.5, 0.75, 0.9]: threshold = total_freq * p # 找到第一个累积频率超过阈值的价格 mask = group['cum_freq'] >= threshold if mask.any(): percentile_value = group.loc[mask.idxmax(), 'adult_total_price'] else: percentile_value = group['adult_total_price'].max() percentiles.append(percentile_value) return pd.Series(percentiles, index=['price_weighted_percentile_25', 'price_weighted_percentile_50', 'price_weighted_percentile_75', 'price_weighted_percentile_90']) # 按gid和baggage分组计算加权百分位数 weighted_percentiles = df_input.groupby(['gid', 'baggage']).apply(weighted_percentile).reset_index() df_input = df_input.merge(weighted_percentiles, on=['gid', 'baggage'], how='left') # 4. 结合绝对价格和频率的综合判断(改进版) freq_median = df_input.groupby(['gid', 'baggage'])['price_frequency'].transform('median') # 计算价格相对于90%百分位数的倍数,用于区分不同级别的高价 df_input['price_relative_to_90p'] = df_input['adult_total_price'] / df_input['price_weighted_percentile_90'] # 添加价格容忍度:避免相近价格被分到不同区间 # 计算价格差异容忍度(使用各百分位数的1%作为容忍度阈值) # tolerance_90p = df_input['price_weighted_percentile_90'] * 0.01 tolerance_75p = df_input['price_weighted_percentile_75'] * 0.01 tolerance_50p = df_input['price_weighted_percentile_50'] * 0.01 tolerance_25p = df_input['price_weighted_percentile_25'] * 0.01 # 重新设计价格区间分类(确保无重叠): # 首先定义各个区间的mask # 4.1 异常高价:价格远高于90%百分位数(超过1.5倍)且频率极低(低于中位数的1/3) price_abnormal_high_mask = ((df_input['price_relative_to_90p'] > 1.5) & (df_input['price_frequency'] < freq_median * 0.33)) # 4.2 真正高位:严格满足条件(价格 > 90%分位数 且 频率 < 中位数) price_real_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_90']) & (df_input['price_frequency'] < freq_median) & ~price_abnormal_high_mask) # 4.3 正常高位:使用容忍度(价格接近75%分位数) price_normal_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_75'] - tolerance_75p) & ~price_real_high_mask & ~price_abnormal_high_mask) # 4.4 中高价:使用容忍度(价格在50%-75%分位数之间) price_mid_high_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_50'] - tolerance_50p) & (df_input['adult_total_price'] <= df_input['price_weighted_percentile_75'] + tolerance_75p) & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask) # 4.5 中低价:使用容忍度(价格在25%-50%分位数之间) price_mid_low_mask = ((df_input['adult_total_price'] > df_input['price_weighted_percentile_25'] - tolerance_25p) & (df_input['adult_total_price'] <= df_input['price_weighted_percentile_50'] + tolerance_50p) & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask) # 4.6 低价:严格满足条件(价格 ≤ 25%分位数) price_low_mask = ((df_input['adult_total_price'] <= df_input['price_weighted_percentile_25']) & ~price_mid_low_mask & ~price_mid_high_mask & ~price_normal_high_mask & ~price_real_high_mask & ~price_abnormal_high_mask) # 使用np.select确保互斥性 price_zone_masks = [ price_abnormal_high_mask, # 异常高价区(5级) price_real_high_mask, # 真正高价区(4级) price_normal_high_mask, # 正常高价区(3级) price_mid_high_mask, # 中高价区(2级) price_mid_low_mask, # 中低价区(1级) price_low_mask, # 低价区(0级) ] price_zone_values = [5, 4, 3, 2, 1, 0] # 5:异常高价, 4:真正高价, 3:正常高价, 2:中高价, 1:中低价, 0:低价 # 使用np.select确保每个价格只被分到一个区间 price_zone_result = np.select(price_zone_masks, price_zone_values, default=2) # 默认中高价 # 4.8 价格区间综合标记 df_input['price_zone_comprehensive'] = price_zone_result # 5. 价格异常度检测 # 价格相对于均值的标准化偏差 df_input['price_z_score'] = (df_input['adult_total_price'] - df_input['mean_price']) / df_input['std_price'] # 价格异常度:基于Z-score的绝对值 df_input['price_anomaly_score'] = np.abs(df_input['price_z_score']) # 6. 价格稳定性特征 # 计算价格波动系数(标准差/均值) df_input['price_coefficient_variation'] = df_input['std_price'] / df_input['mean_price'] # 7. 价格趋势特征 # 计算当前价格相对于历史价格的位置 df_input['price_relative_position'] = (df_input['adult_total_price'] - df_input['min_price']) / (df_input['max_price'] - df_input['min_price']) df_input['price_relative_position'] = df_input['price_relative_position'].fillna(0.5) # 兜底 # 删除中间计算列 df_input.drop(columns=['price_frequency', 'price_z_score', 'price_relative_to_90p'], inplace=True, errors='ignore') del price_freq del price_stats del weighted_percentiles del freq_median print(">>> 改进版价格区间特征计算完成") # 生成第一机场对 df_input['airport_pair_1'] = ( df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str) ) # 删除原始第一机场码 df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True) # 第一机场对 放到 seg1_dep_time 列的前面 insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time') # 生成第二机场对(带缺失兜底) df_input['airport_pair_2'] = np.where( df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(), 'NA', df_input['seg2_dep_air_port'].astype(str) + "-" + df_input['seg2_arr_air_port'].astype(str) ) # 删除原始第二机场码 df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True) # 第二机场对 放到 seg2_dep_time 列的前面 insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time') # 是否转乘 df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1) # 是否转乘 放到 flight_number_2 列的前面 insert_df_col(df_input, 'is_transfer', 'flight_number_2') # 重命名起飞时刻与到达时刻 df_input.rename( columns={ 'seg1_dep_time': 'dep_time_1', 'seg1_arr_time': 'arr_time_1', 'seg2_dep_time': 'dep_time_2', 'seg2_arr_time': 'arr_time_2', }, inplace=True ) # 第一段飞行时长 df_input['fly_duration_1'] = ( (df_input['arr_time_1'] - df_input['dep_time_1']) .dt.total_seconds() / 3600 ).round(2) # 第二段飞行时长(无转乘为 0) df_input['fly_duration_2'] = ( (df_input['arr_time_2'] - df_input['dep_time_2']) .dt.total_seconds() / 3600 ).fillna(0).round(2) # 总飞行时长 df_input['fly_duration'] = ( df_input['fly_duration_1'] + df_input['fly_duration_2'] ).round(2) # 中转停留时长(无转乘为 0) df_input['stop_duration'] = ( (df_input['dep_time_2'] - df_input['arr_time_1']) .dt.total_seconds() / 3600 ).fillna(0).round(2) # 裁剪,防止负数 # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']: # df_input[c] = df_input[c].clip(lower=0) # 和 is_transfer 逻辑保持一致 # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0 # 一次性插到 is_filled 前面 insert_before = 'is_filled' new_cols = [ 'fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration' ] cols = df_input.columns.tolist() idx = cols.index(insert_before) # 删除旧位置 cols = [c for c in cols if c not in new_cols] # 插入新位置(顺序保持) cols[idx:idx] = new_cols # python独有空切片插入法 df_input = df_input[cols] # 一次生成多个字段 dep_t1 = df_input['dep_time_1'] # 几点起飞(0–23) df_input['flight_by_hour'] = dep_t1.dt.hour # 起飞日期几号(1–31) df_input['flight_by_day'] = dep_t1.dt.day # 起飞日期几月(1–12) df_input['flight_day_of_month'] = dep_t1.dt.month # 起飞日期周几(0=周一, 6=周日) df_input['flight_day_of_week'] = dep_t1.dt.weekday # 起飞日期季度(1–4) df_input['flight_day_of_quarter'] = dep_t1.dt.quarter # 是否周末(周六 / 周日) df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int) # 找到对应的国家码 df_input['dep_country'] = df_input['from_city_code'].map(city_to_country) df_input['arr_country'] = df_input['to_city_code'].map(city_to_country) # 整体出发时间 就是 dep_time_1 df_input['global_dep_time'] = df_input['dep_time_1'] # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1 df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1']) # 出发日期在出发国家是否节假日 df_input['dep_country_is_holiday'] = df_input.apply( lambda r: r['global_dep_time'].date() in COUNTRY_HOLIDAYS.get(r['dep_country'], set()), axis=1 ).astype(int) # 到达日期在到达国家是否节假日 df_input['arr_country_is_holiday'] = df_input.apply( lambda r: r['global_arr_time'].date() in COUNTRY_HOLIDAYS.get(r['arr_country'], set()), axis=1 ).astype(int) # 在任一侧是否节假日 df_input['any_country_is_holiday'] = ( df_input[['dep_country_is_holiday', 'arr_country_is_holiday']] .max(axis=1) ) # 是否跨国航线 df_input['is_cross_country'] = ( df_input['dep_country'] != df_input['arr_country'] ).astype(int) def days_to_next_holiday(country, cur_date): if pd.isna(country) or pd.isna(cur_date): return np.nan holidays = COUNTRY_HOLIDAYS.get(country) if not holidays: return np.nan # 找未来(含当天)的节假日,并排序 future_holidays = sorted([d for d in holidays if d >= cur_date]) if not future_holidays: return np.nan next_holiday = future_holidays[0] # 第一个未来节假日 delta_days = (next_holiday - cur_date).days return delta_days df_input['days_to_holiday'] = df_input.apply( lambda r: days_to_next_holiday( r['dep_country'], r['update_hour'].date() ), axis=1 ) # 没有未来节假日的统一兜底 # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999) # days_to_holiday 插在 update_hour 前面 insert_df_col(df_input, 'days_to_holiday', 'update_hour') # 训练模式 if is_training: print(">>> 训练模式:计算 target 相关列") print(f"\n>>> 开始处理 对应区间: n_hours = {target_n_hours}") target_lower_limit = crop_lower_limit target_upper_limit = target_n_hours mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30) df_targets = df_input.loc[mask_targets].copy() targets_amout = df_targets.shape[0] print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})") if targets_amout == 0: print(f">>> n_hours = {target_n_hours} 无有效数据,跳过") return pd.DataFrame() print(">>> 计算 price_at_n_hours") df_input_object = df_input[(df_input['hours_until_departure'] >= feature_n_hours) & (df_input['baggage'] == 30)].copy() df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前36\32\30小时 # 提取并重命名 price 列 df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'}) print(">>> price_at_n_hours计算完成,示例:") print(df_last_price_at_n_hours.head(5)) # 新的计算降价方式 # 先排序 df_targets = df_targets.sort_values( ['gid', 'hours_until_departure'], ascending=[True, False] ) # 在 gid 内计算价格变化 g = df_targets.groupby('gid', group_keys=False) df_targets['price_diff'] = g['adult_total_price'].diff() # VALID_DROP_MIN = 5 # LOWER_HOUR = 4 # UPPER_HOUR = 28 valid_drop_mask = ( (df_targets['price_diff'] <= -VALID_DROP_MIN) # (df_targets['hours_until_departure'] >= LOWER_HOUR) & # (df_targets['hours_until_departure'] <= UPPER_HOUR) ) # 有效的降价 df_valid_drops = df_targets.loc[valid_drop_mask] # 找「第一次」降价(每个 gid) df_first_price_drop = ( df_valid_drops .groupby('gid', as_index=False) .first() ) # 简化列 df_first_price_drop = df_first_price_drop[ ['gid', 'hours_until_departure', 'adult_total_price', 'price_diff'] ].rename(columns={ 'hours_until_departure': 'time_to_price_drop', 'adult_total_price': 'price_at_d_hours', 'price_diff': 'amount_of_price_drop', }) # 把降价幅度转成正数(更直观) df_first_price_drop['amount_of_price_drop'] = (-df_first_price_drop['amount_of_price_drop']).round(2) pass # # 计算降价信息 # print(">>> 计算降价信息") # df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left') # df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price'] # df_targets['price_dropped'] = ( # (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) & # (df_targets['price_drop_amount'] >= 5) # 降幅不能太小 # ) # df_price_drops = df_targets[df_targets['price_dropped']].copy() # price_drops_len = df_price_drops.shape[0] # if price_drops_len == 0: # print(f">>> n_hours = {current_n_hours} 无降价信息") # # 创建包含指定列的空 DataFrame # df_price_drop_info = pd.DataFrame({ # 'gid': pd.Series(dtype='int64'), # 'first_drop_hours_until_departure': pd.Series(dtype='int64'), # 'price_at_first_drop_hours': pd.Series(dtype='float64') # }) # else: # df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价 # df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={ # 'hours_until_departure': 'first_drop_hours_until_departure', # 'adult_total_price': 'price_at_first_drop_hours' # }) # print(">>> 降价信息计算完成,示例:") # print(df_price_drop_info.head(5)) # # 合并信息 # df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left') # df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int) # df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours'] # df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别 # df_gid_info['time_to_price_drop'] = current_n_hours - df_gid_info['first_drop_hours_until_departure'] # df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别 # del df_input_object # del df_last # del df_last_price_at_n_hours # del df_price_drops # del df_price_drop_info df_gid_info = df_last_price_at_n_hours.merge(df_first_price_drop, on='gid', how='left') df_gid_info['will_price_drop'] = df_gid_info['time_to_price_drop'].notnull().astype(int) df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) pass del df_input_object del df_last del df_last_price_at_n_hours del df_first_price_drop del df_valid_drops del df_targets gc.collect() # 将目标变量合并到输入数据中 print(">>> 将目标变量信息合并到 df_input") df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left') # 使用 0 填充 NaN 值 df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[ ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0) df_input = df_input.rename(columns={ 'will_price_drop': 'target_will_price_drop', 'amount_of_price_drop': 'target_amount_of_drop', 'time_to_price_drop': 'target_time_to_drop' }) # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值 # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...") # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index() # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'}) # gid_count = df_min_price_by_gid.shape[0] # print(f">>> 计算完成,共 {gid_count} 个 gid 分组") # # 将最小价格 merge 到 df_inputs 中 # print(">>> 将最小价格 merge 到输入数据中...") # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left') print(">>> 合并后 df_input 样例:") print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5)) # 预测模式 else: print(">>> 预测模式:补齐 target 相关列(全部置 0)") df_input['target_will_price_drop'] = 0 df_input['target_amount_of_drop'] = 0.0 df_input['target_time_to_drop'] = 0 # 按顺序排列 order_columns = [ "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day", "seats_remaining", "baggage", "baggage_level", "price_decrease_times_total", "price_decrease_times_consecutive", "price_last_decrease_hours", "price_increase_times_total", "price_increase_times_consecutive", "price_last_increase_hours", "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_amount_of_drop", "target_time_to_drop", "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "crawl_date", "gid", "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1", "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer", "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration", "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country", "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday", "price_weighted_percentile_25", "price_weighted_percentile_50", "price_weighted_percentile_75", "price_weighted_percentile_90", "price_zone_comprehensive", "price_relative_position", ] df_input = df_input[order_columns] return df_input def standardization(df, feature_scaler, target_scaler=None, is_training=True, is_val=False, feature_length=240): print(">>> 开始标准化处理") # 准备走标准化的特征 scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration', 'price_weighted_percentile_25', 'price_weighted_percentile_50', 'price_weighted_percentile_75', 'price_weighted_percentile_90'] if is_training: print(">>> 特征数据标准化开始") if feature_scaler is None: feature_scaler = StandardScaler() if not is_val: feature_scaler.fit(df[scaler_features]) df[scaler_features] = feature_scaler.transform(df[scaler_features]) print(">>> 特征数据标准化完成") else: df[scaler_features] = feature_scaler.transform(df[scaler_features]) print(">>> 预测模式下特征标准化处理完成") # 准备走归一化的特征 # 事先定义好每个特征的合理范围 fixed_ranges = { 'hours_until_departure': (0, 480), # 0-20天 'from_city_num': (0, 38), 'to_city_num': (0, 38), 'flight_1_num': (0, 341), 'flight_2_num': (0, 341), 'seats_remaining': (1, 5), # 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次 # 'price_last_change_hours': (0, 480), 'price_decrease_times_total': (0, 20), # 假设价格下降次数不会超过20次 'price_decrease_times_consecutive': (0, 10), # 假设价格连续下降次数不会超过10次 'price_last_decrease_hours': (0, feature_length), #(0-240小时) 'price_increase_times_total': (0, 20), # 假设价格上升次数不会超过20次 'price_increase_times_consecutive': (0, 10), # 假设价格连续上升次数不会超过10次 'price_last_increase_hours': (0, feature_length), #(0-240小时) 'price_zone_comprehensive': (0, 5), 'days_to_departure': (0, 30), 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天 'flight_by_hour': (0, 23), 'flight_by_day': (1, 31), 'flight_day_of_month': (1, 12), 'flight_day_of_week': (0, 6), 'flight_day_of_quarter': (1, 4), } normal_features = list(fixed_ranges.keys()) print(">>> 归一化特征列: ", normal_features) print(">>> 基于固定范围的特征数据归一化开始") for col in normal_features: if col in df.columns: # 核心归一化公式: (x - min) / (max - min) col_min, col_max = fixed_ranges[col] df[col] = (df[col] - col_min) / (col_max - col_min) # 添加裁剪,将超出范围的值强制限制在[0,1]区间 df[col] = df[col].clip(0, 1) print(">>> 基于固定范围的特征数据归一化完成") return df, feature_scaler, target_scaler def preprocess_data_simple(df_input, is_train=False): df_input = preprocess_data_first_half(df_input) # 在 gid 与 baggage 内按时间降序 df_input = df_input.sort_values( by=['gid', 'baggage', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) df_input = df_input[df_input['hours_until_departure'] <= 480] df_input = df_input[df_input['baggage'] == 0] # 只保留无行李的 # 在hours_until_departure 的末尾 保留真实的而不是补齐的数据 if not is_train: _tail_filled = df_input.groupby(['gid', 'baggage'])['is_filled'].transform( lambda s: s.iloc[::-1].cummin().iloc[::-1] ) df_input = df_input[~((df_input['is_filled'] == 1) & (_tail_filled == 1))] # 价格变化最小量阈值 price_change_amount_threshold = 5 df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'].diff() # 计算价格变化量 # df_input['price_change_amount'] = ( # df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'] # .apply(lambda s: s.diff().replace(0, np.nan).ffill().fillna(0)).round(2) # ) df_input['price_change_amount'] = ( df_input['_raw_price_diff'] .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage']], group_keys=False) .ffill() .fillna(0) .round(2) ) # 计算价格变化百分比(相对于上一时间点的变化率) # df_input['price_change_percent'] = ( # df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'] # .apply(lambda s: s.pct_change().replace(0, np.nan).ffill().fillna(0)).round(4) # ) df_input['price_change_percent'] = ( df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'] .pct_change() .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage']], group_keys=False) .ffill() .fillna(0) .round(4) ) # 第一步:标记价格变化段 df_input['price_change_segment'] = ( df_input.groupby(['gid', 'baggage'], group_keys=False)['price_change_amount'] .apply(lambda s: (s != s.shift()).cumsum()) ) # 第二步:计算每个变化段内的持续时间 df_input['price_duration_hours'] = ( df_input.groupby(['gid', 'baggage', 'price_change_segment'], group_keys=False) .cumcount() .add(1) ) # 可选:删除临时列 # df_input = df_input.drop(columns=['price_change_segment']) df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff']) # 仅在价格变化点记录余票变化量;其它非价格变化点置空(NaN) # _price_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['adult_total_price'].diff() # _price_changed = _price_diff.notna() & _price_diff.ne(0) # _seats_diff = df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining'].diff() # df_input['seats_remaining_change_amount'] = _seats_diff.where(_price_changed).round(0) # # 前向填充 并 填充缺失值为0 # df_input['seats_remaining_change_amount'] = ( # df_input.groupby(['gid', 'baggage'], group_keys=False)['seats_remaining_change_amount'] # .ffill() # .fillna(0) # ) adult_price = df_input.pop('Adult_Total_Price') hours_until = df_input.pop('Hours_Until_Departure') df_input['Adult_Total_Price'] = adult_price df_input['Hours_Until_Departure'] = hours_until df_input['Baggage'] = df_input['baggage'] # 训练过程 if is_train: df_target = df_input[(df_input['hours_until_departure'] >= 12) & (df_input['hours_until_departure'] <= 360)].copy() # 扩展至360小时(15天) df_target = df_target.sort_values( by=['gid', 'hours_until_departure'], ascending=[True, False] ).reset_index(drop=True) # 对于先升后降的分析 prev_pct = df_target.groupby('gid', group_keys=False)['price_change_percent'].shift(1) prev_amo = df_target.groupby('gid', group_keys=False)['price_change_amount'].shift(1) prev_dur = df_target.groupby('gid', group_keys=False)['price_duration_hours'].shift(1) # prev_seats_amo = df_target.groupby('gid', group_keys=False)['seats_remaining_change_amount'].shift(1) prev_price = df_target.groupby('gid', group_keys=False)['adult_total_price'].shift(1) prev_seats = df_target.groupby('gid', group_keys=False)['seats_remaining'].shift(1) drop_mask = (prev_pct > 0) & (df_target['price_change_percent'] < 0) df_drop_nodes = df_target.loc[drop_mask, ['gid', 'hours_until_departure']].copy() df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True) df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy() df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy() df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy() # df_drop_nodes['high_price_seats_remaining_change_amount'] = prev_seats_amo.loc[drop_mask].astype(float).round(1).to_numpy() df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy() df_drop_nodes['high_price_seats_remaining'] = prev_seats.loc[drop_mask].astype(int).to_numpy() df_drop_nodes = df_drop_nodes.reset_index(drop=True) flight_info_cols = [ 'city_pair', 'flight_number_1', 'seg1_dep_air_port', 'seg1_dep_time', 'seg1_arr_air_port', 'seg1_arr_time', 'flight_number_2', 'seg2_dep_air_port', 'seg2_dep_time', 'seg2_arr_air_port', 'seg2_arr_time', 'currency', 'baggage', 'flight_day', ] flight_info_cols = [c for c in flight_info_cols if c in df_target.columns] df_gid_info = df_target[['gid'] + flight_info_cols].drop_duplicates(subset=['gid']).reset_index(drop=True) df_drop_nodes = df_drop_nodes.merge(df_gid_info, on='gid', how='left') drop_info_cols = ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount', 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'high_price_seats_remaining', ] # 按顺序排列 去掉gid df_drop_nodes = df_drop_nodes[flight_info_cols + drop_info_cols] # df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_price_change_percent'] <= -0.01] # 太低的降幅不计 # 对于“上涨后再次上涨”的分析(连续两个正向变价段) seg_start_mask = df_target['price_duration_hours'].eq(1) rise_mask = seg_start_mask & (prev_pct > 0) & (df_target['price_change_percent'] > 0) df_rise_nodes = df_target.loc[rise_mask, ['gid', 'hours_until_departure']].copy() df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True) df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy() df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy() df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_seats_remaining'] = prev_seats.loc[rise_mask].astype(int).to_numpy() df_rise_nodes = df_rise_nodes.reset_index(drop=True) df_rise_nodes = df_rise_nodes.merge(df_gid_info, on='gid', how='left') rise_info_cols = [ 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount', 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_seats_remaining', ] df_rise_nodes = df_rise_nodes[flight_info_cols + rise_info_cols] # 制作历史包络线 envelope_group = ['city_pair', 'flight_number_1', 'flight_number_2', 'flight_day'] idx_peak = df_input.groupby(envelope_group)['adult_total_price'].idxmax() df_envelope = df_input.loc[idx_peak, envelope_group + [ 'adult_total_price', 'hours_until_departure' ]].rename(columns={ 'adult_total_price': 'peak_price', 'hours_until_departure': 'peak_hours', }).reset_index(drop=True) # 对于没有先升后降的gid进行分析 # gids_with_drop = df_target.loc[drop_mask, 'gid'].unique() # df_no_drop = df_target[~df_target['gid'].isin(gids_with_drop)].copy() # keep_info_cols = [ # 'keep_hours_until_departure', 'keep_price_change_percent', 'keep_price_change_amount', # 'keep_price_duration_hours', 'keep_price_amount', 'keep_price_seats_remaining', # ] # if df_no_drop.empty: # df_keep_nodes = pd.DataFrame(columns=flight_info_cols + keep_info_cols) # else: # df_no_drop = df_no_drop.sort_values( # by=['gid', 'hours_until_departure'], # ascending=[True, False] # ).reset_index(drop=True) # df_no_drop['keep_segment'] = df_no_drop.groupby('gid')['price_change_percent'].transform( # lambda s: (s != s.shift()).cumsum() # ) # df_keep_row = ( # df_no_drop.groupby(['gid', 'keep_segment'], as_index=False) # .tail(1) # .reset_index(drop=True) # ) # df_keep_nodes = df_keep_row[ # ['gid', 'hours_until_departure', 'price_change_percent', 'price_change_amount', # 'price_duration_hours', 'adult_total_price', 'seats_remaining'] # ].copy() # df_keep_nodes.rename( # columns={ # 'hours_until_departure': 'keep_hours_until_departure', # 'price_change_percent': 'keep_price_change_percent', # 'price_change_amount': 'keep_price_change_amount', # 'price_duration_hours': 'keep_price_duration_hours', # 'adult_total_price': 'keep_price_amount', # 'seats_remaining': 'keep_price_seats_remaining', # }, # inplace=True, # ) # df_keep_nodes = df_keep_nodes.merge(df_gid_info, on='gid', how='left') # df_keep_nodes = df_keep_nodes[flight_info_cols + keep_info_cols] # del df_keep_row del df_gid_info del df_target # del df_no_drop return df_input, df_drop_nodes, df_rise_nodes, df_envelope return df_input, None, None, None def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".", pred_time_str=""): if df_input is None or df_input.empty: return pd.DataFrame() df_sorted = df_input.sort_values( by=['gid', 'hours_until_departure'], ascending=[True, False], ).reset_index(drop=True) df_sorted = df_sorted[ df_sorted['hours_until_departure'].between(12, 360) ].reset_index(drop=True) # 每个 gid 取 hours_until_departure 最小的一条 df_min_hours = ( df_sorted.drop_duplicates(subset=['gid'], keep='last') .reset_index(drop=True) ) # 确保 hours_until_departure 在 [12, 360] 的 范围内 # df_min_hours = df_min_hours[ # df_min_hours['hours_until_departure'].between(12, 360) # ].reset_index(drop=True) drop_info_csv_path = os.path.join(output_dir, f'{group_route_str}_drop_info.csv') if os.path.exists(drop_info_csv_path): df_drop_nodes = pd.read_csv(drop_info_csv_path) else: df_drop_nodes = pd.DataFrame() rise_info_csv_path = os.path.join(output_dir, f'{group_route_str}_rise_info.csv') if os.path.exists(rise_info_csv_path): df_rise_nodes = pd.read_csv(rise_info_csv_path) else: df_rise_nodes = pd.DataFrame() # ==================== 跨航班日包络线 + 降价潜力 ==================== print(">>> 构建跨航班日价格包络线") flight_key = ['city_pair', 'flight_number_1', 'flight_number_2'] day_key = flight_key + ['flight_day'] # 1. 历史侧:加载训练阶段的峰值数据 envelope_csv_path = os.path.join(output_dir, f'{group_route_str}_envelope_info.csv') if os.path.exists(envelope_csv_path): df_hist = pd.read_csv(envelope_csv_path) df_hist = df_hist[day_key + ['peak_price', 'peak_hours']] df_hist['source'] = 'hist' else: df_hist = pd.DataFrame() # 2. 未来侧:当前在售价格 df_future = df_min_hours[day_key + ['adult_total_price', 'hours_until_departure']].copy().rename( columns={'adult_total_price': 'peak_price', 'hours_until_departure': 'peak_hours'} ) df_future['source'] = 'future' # 3. 合并包络线数据点 df_envelope_all = pd.concat( [x for x in [df_hist, df_future] if not x.empty], ignore_index=True ).drop_duplicates(subset=day_key, keep='last') # 4. 包络线统计 + 找高点起飞日 df_envelope_agg = df_envelope_all.groupby(flight_key).agg( envelope_max=('peak_price', 'max'), # 峰值最大 envelope_min=('peak_price', 'min'), # 峰值最小 envelope_mean=('peak_price', 'mean'), # 峰值平均 envelope_count=('peak_price', 'count'), # 峰值统计总数 envelope_avg_peak_hours=('peak_hours', 'mean'), # 峰值发生的距离起飞小时数, 做一下平均 ).reset_index() # 对数值列保留两位小数 df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']] = df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']].round(2) idx_top = df_envelope_all.groupby(flight_key)['peak_price'].idxmax() df_top = df_envelope_all.loc[idx_top, flight_key + ['flight_day', 'peak_price', 'peak_hours']].rename( columns={'flight_day': 'target_flight_day', 'peak_price': 'target_price', 'peak_hours': 'target_peak_hours'} ) df_envelope_agg = df_envelope_agg.merge(df_top, on=flight_key, how='left') # 5. 合并到 df_min_hours df_min_hours = df_min_hours.merge(df_envelope_agg, on=flight_key, how='left') price_range = (df_min_hours['envelope_max'] - df_min_hours['envelope_min']).replace(0, 1) # 计算当前价格在包络区间的百分位 df_min_hours['envelope_position'] = ( (df_min_hours['adult_total_price'] - df_min_hours['envelope_min']) / price_range ).clip(0, 1).round(4) df_min_hours['is_envelope_peak'] = (df_min_hours['envelope_position'] >= 0.75).astype(int) # 0.95 -> 0.75 df_min_hours['is_target_day'] = (df_min_hours['flight_day'] == df_min_hours['target_flight_day']).astype(int) # ==================== 目标二:降价潜力评分 ==================== # 用“上涨后回落倾向”替代简单计数:drop / (drop + rise) # drop_count 来自 _drop_info.csv(上涨段后转跌),rise_count 来自 _rise_info.csv(上涨段后继续涨) df_min_hours['drop_potential'] = 0.0 # 先保证相关列一定存在,避免后续选列 KeyError # df_min_hours['drop_freq_count'] = 0.0 # df_min_hours['rise_freq_count'] = 0.0 df_drop_freq = pd.DataFrame(columns=flight_key + ['drop_freq_count']) df_rise_freq = pd.DataFrame(columns=flight_key + ['rise_freq_count']) if not df_drop_nodes.empty: df_drop_freq = ( df_drop_nodes.groupby(flight_key) .size() .reset_index(name='drop_freq_count') ) if not df_rise_nodes.empty: df_rise_freq = ( df_rise_nodes.groupby(flight_key) .size() .reset_index(name='rise_freq_count') ) if (not df_drop_freq.empty) or (not df_rise_freq.empty): df_min_hours = df_min_hours.merge(df_drop_freq, on=flight_key, how='left') df_min_hours = df_min_hours.merge(df_rise_freq, on=flight_key, how='left') df_min_hours['drop_freq_count'] = df_min_hours['drop_freq_count'].fillna(0).astype(float) df_min_hours['rise_freq_count'] = df_min_hours['rise_freq_count'].fillna(0).astype(float) # 轻微平滑,避免样本很少时出现 0/0 或过度极端 alpha = 1.0 denom = df_min_hours['drop_freq_count'] + df_min_hours['rise_freq_count'] + 2.0 * alpha df_min_hours['drop_potential'] = ( (df_min_hours['drop_freq_count'] + alpha) / denom.replace(0, np.nan) ).fillna(0.0).clip(0, 1).round(4) # ==================== 综合评分:包络高位 × 降价潜力 ==================== # target_score = 包络位置(越高越好)× 降价潜力(越高越好) thres_ep = 0.7 thres_dp = 0.3 df_min_hours['target_score'] = ( df_min_hours['envelope_position'] * thres_ep + df_min_hours['drop_potential'] * thres_dp ).round(4) # 综合评分阈值:大于阈值的都认为值得投放 target_score_threshold = 0.75 # df_min_hours['target_score_threshold'] = target_score_threshold df_min_hours['is_good_target'] = (df_min_hours['target_score'] >= target_score_threshold).astype(int) print(f">>> 包络线+降价潜力评分完成") del df_hist, df_future, df_envelope_all, df_envelope_agg, df_top, df_drop_freq, df_rise_freq df_min_hours = df_min_hours[df_min_hours['is_good_target'] == 1].reset_index(drop=True) # 保留值得投放的 # ===================================================================== df_min_hours['simple_will_price_drop'] = 0 df_min_hours['simple_drop_in_hours'] = 0 df_min_hours['simple_drop_in_hours_prob'] = 0.0 df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知 df_min_hours['flag_dist'] = '' df_min_hours['drop_price_change_upper'] = 0.0 # df_min_hours['drop_price_change_mode'] = 0.0 df_min_hours['drop_price_change_lower'] = 0.0 df_min_hours['drop_price_sample_size'] = 0 df_min_hours['rise_price_change_upper'] = 0.0 # df_min_hours['rise_price_change_mode'] = 0.0 df_min_hours['rise_price_change_lower'] = 0.0 df_min_hours['rise_price_sample_size'] = 0 # 这个阈值取多少? pct_threshold = 0.01 # pct_threshold = 2 pct_threshold_1 = 0.01 # pct_threshold_c = 0.001 for idx, row in df_min_hours.iterrows(): city_pair = row['city_pair'] flight_number_1 = row['flight_number_1'] flight_number_2 = row['flight_number_2'] if flight_number_1 == 'VJ878': # 调试时用 pass price_change_percent = row['price_change_percent'] price_change_amount = row['price_change_amount'] price_duration_hours = row['price_duration_hours'] hours_until_departure = row['hours_until_departure'] # seats_remaining_change_amount = row['seats_remaining_change_amount'] price_amount = row['adult_total_price'] seats_remaining = row['seats_remaining'] length_drop = 0 length_rise = 0 # length_keep = 0 # 针对历史上发生的 高价->低价 if not df_drop_nodes.empty: # 对准航班号, 不同起飞日期 if flight_number_2 and flight_number_2 != 'VJ': df_drop_nodes_part = df_drop_nodes[ (df_drop_nodes['city_pair'] == city_pair) & (df_drop_nodes['flight_number_1'] == flight_number_1) & (df_drop_nodes['flight_number_2'] == flight_number_2) ] else: df_drop_nodes_part = df_drop_nodes[ (df_drop_nodes['city_pair'] == city_pair) & (df_drop_nodes['flight_number_1'] == flight_number_1) ] # 降价前 增幅阈值的匹配 与 高价历史持续时间 得出降价时间的概率 if not df_drop_nodes_part.empty and pd.notna(price_change_percent): # 增幅太小的去掉 # df_drop_nodes_part = df_drop_nodes_part[df_drop_nodes_part['high_price_change_percent'] >= 0.01] # pct_diff = (df_drop_nodes_part['high_price_change_percent'] - float(price_change_percent)).abs() # df_match = df_drop_nodes_part.loc[pct_diff <= pct_threshold, ['high_price_duration_hours', 'high_price_change_percent']].copy() pct_base = float(price_change_percent) pct_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_percent'], errors='coerce') df_drop_gap = df_drop_nodes_part.loc[ pct_vals.notna(), ['drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount', 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'high_price_seats_remaining'] ].copy() df_drop_gap['pct_gap'] = (pct_vals.loc[pct_vals.notna()] - pct_base) df_drop_gap['pct_abs_gap'] = df_drop_gap['pct_gap'].abs() price_base = pd.to_numeric(price_amount, errors='coerce') high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce') df_drop_gap['price_gap'] = high_price_vals - price_base df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs() df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True]) df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy() # 历史上出现的极近似的增长幅度后的降价场景 if not df_match.empty: dur_base = pd.to_numeric(price_duration_hours, errors='coerce') hud_base = pd.to_numeric(hours_until_departure, errors='coerce') # seats_base = pd.to_numeric(seats_remaining_change_amount, errors='coerce') if pd.notna(dur_base) and pd.notna(hud_base): # and pd.notna(seats_base) df_match_chk = df_match.copy() # dur_vals = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') # df_match_chk = df_match_chk.loc[dur_vals.notna()].copy() # df_match_chk = df_match_chk.loc[(dur_vals.loc[dur_vals.notna()] - float(dur_base)).abs() <= 36].copy() # drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce') # df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy() # df_match_chk = df_match_chk.loc[(drop_hud_vals.loc[drop_hud_vals.notna()] - float(hud_base)).abs() <= 24].copy() # seats_vals = pd.to_numeric(df_match_chk['high_price_seats_remaining_change_amount'], errors='coerce') # df_match_chk = df_match_chk.loc[seats_vals.notna()].copy() # df_match_chk = df_match_chk.loc[seats_vals.loc[seats_vals.notna()] == float(seats_base)].copy() # 持续时间、距离起飞时间、座位变化都匹配上 if not df_match_chk.empty: length_drop = df_match_chk.shape[0] df_min_hours.loc[idx, 'drop_price_sample_size'] = length_drop drop_price_change_upper = df_match_chk['drop_price_change_amount'].max() # 降价上限 drop_price_change_lower = df_match_chk['drop_price_change_amount'].min() # 降价下限 df_min_hours.loc[idx, 'drop_price_change_upper'] = round(drop_price_change_upper, 2) df_min_hours.loc[idx, 'drop_price_change_lower'] = round(drop_price_change_lower, 2) # drop_mode_values = df_match_chk['drop_price_change_amount'].mode() # 降价众数 # if len(drop_mode_values) > 0: # df_min_hours.loc[idx, 'drop_price_change_mode'] = round(float(drop_mode_values[0]), 2) remaining_hours = ( pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base) ).clip(lower=0) remaining_hours = remaining_hours.round().astype(int) counts = remaining_hours.value_counts().sort_index() probs = (counts / counts.sum()).round(4) top_hours = int(probs.idxmax()) top_prob = float(probs.max()) dist_items = list(zip(probs.index.tolist(), probs.tolist())) dist_items = dist_items[:10] dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items]) df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1 df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str df_min_hours.loc[idx, 'flag_dist'] = 'd0' # continue # 已经判定降价 后面不再做 # 历史上未出现的极近似的增长幅度后的降价场景 else: pass # if pd.notna(price_duration_hours) and price_change_percent >= 0.1: # pct_vals = pd.to_numeric( # df_drop_nodes_part['high_price_change_percent'], # errors='coerce' # ).replace([np.inf, -np.inf], np.nan).dropna() # dur_vals = pd.to_numeric( # df_drop_nodes_part['high_price_duration_hours'], # errors='coerce' # ).replace([np.inf, -np.inf], np.nan).dropna() # if not pct_vals.empty and not dur_vals.empty: # pct_min = float(pct_vals.min()) # pct_max = float(pct_vals.max()) # dur_min = float(dur_vals.min()) # dur_max = float(dur_vals.max()) # if (pct_min <= float(price_change_percent) <= pct_max) and (dur_min <= float(price_duration_hours) <= dur_max): # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.5' # continue # 已经判定降价 后面不再做 # elif (pct_min <= float(price_change_percent)) and (dur_min <= float(price_duration_hours)): # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.3 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.3' # continue # 已经判定降价 后面不再做 # 针对历史上发生的 连续涨价 if not df_rise_nodes.empty: # 对准航班号, 不同起飞日期 if flight_number_2 and flight_number_2 != 'VJ': df_rise_nodes_part = df_rise_nodes[ (df_rise_nodes['city_pair'] == city_pair) & (df_rise_nodes['flight_number_1'] == flight_number_1) & (df_rise_nodes['flight_number_2'] == flight_number_2) ] else: df_rise_nodes_part = df_rise_nodes[ (df_rise_nodes['city_pair'] == city_pair) & (df_rise_nodes['flight_number_1'] == flight_number_1) ] if not df_rise_nodes_part.empty and pd.notna(price_change_percent): # pct_vals_1 = df_keep_nodes_part['keep_price_change_percent'].replace([np.inf, -np.inf], np.nan).dropna() # # 保留百分位 10% ~ 90% 之间的 数据 # if not pct_vals_1.empty: # q10_1 = float(pct_vals_1.quantile(0.10)) # q90_1 = float(pct_vals_1.quantile(0.90)) # df_keep_nodes_part = df_keep_nodes_part[ # df_keep_nodes_part['keep_price_change_percent'].between(q10_1, q90_1) # ] # if df_keep_nodes_part.empty: # continue # 特殊判定场景 # if price_change_percent < 0: # df_tmp = df_keep_nodes_part.copy() # # 确保组内顺序正确(如果前面已经排过,这行可省略) # df_tmp = df_tmp.sort_values( # by=["flight_day", "keep_hours_until_departure"], # ascending=[True, False] # ) # # 是否为负值 # df_tmp["is_negative"] = df_tmp["keep_price_change_percent"] < 0 # if df_tmp["is_negative"].any(): # # 标记“负值段”的开始 # # 当 is_negative 为 True 且 前一行不是负值时,认为是一个新段 # df_tmp["neg_block_id"] = ( # df_tmp["is_negative"] # & ~df_tmp.groupby("flight_day")["is_negative"].shift(fill_value=False) # ).groupby(df_tmp["flight_day"]).cumsum() # # 在每个负值段内计数(第几个负值) # df_tmp["neg_rank_in_block"] = ( # df_tmp.groupby(["flight_day", "neg_block_id"]) # .cumcount() + 1 # ) # # 每个连续负值段的长度 # df_tmp["neg_block_size"] = ( # df_tmp.groupby(["flight_day", "neg_block_id"])["is_negative"] # .transform("sum") # ) # # 只保留: # # 1) 是负值 # # 2) 且不是该连续负值段的最后一个 # df_continuous_price_drop = df_tmp[ # (df_tmp["is_negative"]) & # (df_tmp["neg_rank_in_block"] < df_tmp["neg_block_size"]) # ].drop( # columns=[ # "is_negative", # "neg_block_id", # "neg_rank_in_block", # "neg_block_size", # ] # ) # pct_diff_c = (df_continuous_price_drop['keep_price_change_percent'] - float(price_change_percent)).abs() # df_match_c = df_continuous_price_drop.loc[pct_diff_c <= pct_threshold_c, ['flight_day', 'keep_hours_until_departure', 'keep_price_duration_hours', 'keep_price_change_percent']].copy() # # 符合连续降价条件 # if not df_match_c.empty and pd.notna(price_duration_hours): # vals_c = df_match_c['keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna() # if not vals_c.empty: # min_val_c = vals_c.min() # if min_val_c <= float(price_duration_hours): # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'c1' # length_drop = df_match_c.shape[0] # # continue # 已经判定降价 后面不再做 # 一般判定场景 pct_base_1 = float(price_change_percent) pct_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_percent'], errors='coerce') df_rise_gap_1 = df_rise_nodes_part.loc[ pct_vals_1.notna(), ['rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount', 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_seats_remaining'] ].copy() df_rise_gap_1['pct_gap'] = (pct_vals_1.loc[pct_vals_1.notna()] - pct_base_1) df_rise_gap_1['pct_abs_gap'] = df_rise_gap_1['pct_gap'].abs() price_base_1 = pd.to_numeric(price_amount, errors='coerce') rise_price_vals_1 = pd.to_numeric(df_rise_gap_1['prev_rise_amount'], errors='coerce') df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1 df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs() df_rise_gap_1 = df_rise_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True]) df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_rise_gap_1['price_abs_gap'] <= 10.0)].copy() # 历史上出现过近似变化幅度后继续涨价场景 if not df_match_1.empty: # df_match_1['hours_delta'] = hours_until_departure - df_match_1['rise_hours_until_departure'] # df_match_1['modify_rise_price_duration_hours'] = df_match_1['rise_price_duration_hours'] - df_match_1['hours_delta'] # df_match_1 = df_match_1[df_match_1['modify_rise_price_duration_hours'] > 0] # dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce') hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce') # seats_base_1 = pd.to_numeric(seats_remaining_change_amount, errors='coerce') if pd.notna(hud_base_1): # and pd.notna(seats_base_1) df_match_chk_1 = df_match_1.copy() # dur_vals_1 = pd.to_numeric(df_match_chk_1['modify_rise_price_duration_hours'], errors='coerce') # df_match_chk_1 = df_match_chk_1.loc[dur_vals_1.notna()].copy() # df_match_chk_1 = df_match_chk_1.loc[(dur_vals_1.loc[dur_vals_1.notna()] - float(dur_base_1)).abs() <= 24].copy() # rise_hud_vals_1 = pd.to_numeric(df_match_chk_1['rise_hours_until_departure'], errors='coerce') # df_match_chk_1 = df_match_chk_1.loc[rise_hud_vals_1.notna()].copy() # df_match_chk_1 = df_match_chk_1.loc[(rise_hud_vals_1.loc[rise_hud_vals_1.notna()] - float(hud_base_1)).abs() <= 24].copy() # seats_vals_1 = pd.to_numeric(df_match_chk_1['rise_seats_remaining_change_amount'], errors='coerce') # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.notna()].copy() # df_match_chk_1 = df_match_chk_1.loc[seats_vals_1.loc[seats_vals_1.notna()] == float(seats_base_1)].copy() # 持续时间、距离起飞时间、座位变化都匹配上 if not df_match_chk_1.empty: length_rise = df_match_chk_1.shape[0] df_min_hours.loc[idx, 'rise_price_sample_size'] = length_rise rise_price_change_upper = df_match_chk_1['rise_price_change_amount'].max() # 涨价上限 rise_price_change_lower = df_match_chk_1['rise_price_change_amount'].min() # 涨价下限 df_min_hours.loc[idx, 'rise_price_change_upper'] = round(rise_price_change_upper, 2) df_min_hours.loc[idx, 'rise_price_change_lower'] = round(rise_price_change_lower, 2) # rise_mode_values = df_match_chk_1['rise_price_change_amount'].mode() # 涨价众数 # if len(rise_mode_values) > 0: # df_min_hours.loc[idx, 'rise_price_change_mode'] = round(float(rise_mode_values[0]), 2) # 可以明确的判定不降价 if length_drop == 0: df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r0' df_min_hours.loc[idx, 'flag_dist'] = 'r0' # 分歧判定 else: drop_prob = round(length_drop / (length_rise + length_drop), 2) # 依旧保持之前的降价判定,概率修改 if drop_prob >= 0.4: df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1' df_min_hours.loc[idx, 'flag_dist'] = 'd1' # 改判不降价,概率修改 else: df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r1' df_min_hours.loc[idx, 'flag_dist'] = 'r1' # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob # elif length_keep == length_drop: # 不降价与降价相同, 取0.5概率 # df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1' # df_match_1['hours_delta'] = hours_until_departure - df_match_1['keep_hours_until_departure'] # df_match_1['modify_keep_price_duration_hours'] = df_match_1['keep_price_duration_hours'] - df_match_1['hours_delta'] # df_match_1 = df_match_1[df_match_1['modify_keep_price_duration_hours'] > 0] # 比较 price_duration_hours 在 modify_keep_price_duration_hours 的百分位 # vals = df_match_1['modify_keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna() # if not vals.empty: # # q10_11 = float(vals.quantile(0.10)) # min_val = vals.min() # if min_val <= float(price_duration_hours): # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1' # 历史上未出现过近似变化幅度后保持低价场景 else: pass # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n0' # if pd.notna(price_duration_hours) and price_change_percent <= 0.1: # df_keep_nodes_part_1 = df_keep_nodes_part[df_keep_nodes_part['keep_price_change_percent'] <= 0.1] # pct_vals_1 = pd.to_numeric( # df_keep_nodes_part_1['keep_price_change_percent'], # errors='coerce' # ).replace([np.inf, -np.inf], np.nan).dropna() # dur_vals_1 = pd.to_numeric( # df_keep_nodes_part_1['keep_price_duration_hours'], # errors='coerce' # ).replace([np.inf, -np.inf], np.nan).dropna() # if not pct_vals_1.empty and not dur_vals_1.empty: # pct_min_1 = float(pct_vals_1.min()) # pct_max_1 = float(pct_vals_1.max()) # dur_min_1 = float(dur_vals_1.min()) # dur_max_1 = float(dur_vals_1.max()) # if (pct_min_1 <= float(price_change_percent) <= pct_max_1) and (dur_min_1 <= float(price_duration_hours) <= dur_max_1): # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n1' pass print("判定循环结束") # 按航班号统一其降价/涨价的上限与下限, 上限统一取最大, 下限统一取最小 # _grp_cols = ['city_pair', 'flight_number_1', 'flight_number_2'] # _g = df_min_hours.groupby(_grp_cols, dropna=False) # df_min_hours['drop_price_change_upper'] = pd.to_numeric( # _g['drop_price_change_upper'].transform('max'), # errors='coerce' # ).fillna(0.0).round(2) # df_min_hours['drop_price_change_lower'] = pd.to_numeric( # _g['drop_price_change_lower'].transform('min'), # errors='coerce' # ).fillna(0.0).round(2) # df_min_hours['rise_price_change_upper'] = pd.to_numeric( # _g['rise_price_change_upper'].transform('max'), # errors='coerce' # ).fillna(0.0).round(2) # df_min_hours['rise_price_change_lower'] = pd.to_numeric( # _g['rise_price_change_lower'].transform('min'), # errors='coerce' # ).fillna(0.0).round(2) df_min_hours = df_min_hours.rename(columns={'seg1_dep_time': 'from_time'}) _pred_dt = pd.to_datetime(str(pred_time_str), format="%Y%m%d%H%M", errors="coerce") df_min_hours["update_hour"] = _pred_dt.strftime("%Y-%m-%d %H:%M:%S") _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h") df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(360, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S") df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(12, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S") # 要展示在预测表里的字段 order_cols = ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2', 'from_time', 'baggage', 'seats_remaining', 'currency', 'adult_total_price', 'hours_until_departure', 'price_change_percent', 'price_duration_hours', 'update_hour', 'crawl_date', 'valid_begin_hour', 'valid_end_hour', 'simple_will_price_drop', 'simple_drop_in_hours', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist', 'flag_dist', 'drop_price_change_upper', 'drop_price_change_lower', 'drop_price_sample_size', 'rise_price_change_upper', 'rise_price_change_lower', 'rise_price_sample_size', 'envelope_max', 'envelope_min', 'envelope_mean', 'envelope_count', 'envelope_avg_peak_hours', 'envelope_position', 'is_envelope_peak', # 包络线特征 'target_flight_day', 'target_price', 'target_peak_hours', 'is_target_day', # 高点起飞日(纯包络线高点) 'drop_freq_count', 'drop_potential', # 降价潜力 'target_score', 'is_good_target', # 综合目标评分(高点 × 降价潜力 = 最终投放目标) ] df_predict = df_min_hours[order_cols] df_predict = df_predict.rename(columns={ 'simple_will_price_drop': 'will_price_drop', 'simple_drop_in_hours': 'drop_in_hours', 'simple_drop_in_hours_prob': 'drop_in_hours_prob', 'simple_drop_in_hours_dist': 'drop_in_hours_dist', } ) # 排序 df_predict = df_predict.sort_values( by=['city_pair', 'flight_number_1', 'flight_number_2', 'flight_day'], kind='mergesort', na_position='last', ).reset_index(drop=True) # 时间段过滤 过滤掉异常时间(update_hour 早于 crawl_date) update_dt = pd.to_datetime(df_predict["update_hour"], errors="coerce") crawl_dt = pd.to_datetime(df_predict["crawl_date"], errors="coerce") dt_diff = update_dt - crawl_dt df_predict = df_predict.loc[ (dt_diff >= pd.Timedelta(0)) & (dt_diff <= pd.Timedelta(hours=12)) # (dt_diff >= pd.Timedelta(0)) ].reset_index(drop=True) print("更新时间过滤完成") total_cnt = len(df_predict) if "will_price_drop" in df_predict.columns: _wpd = pd.to_numeric(df_predict["will_price_drop"], errors="coerce") drop_1_cnt = int((_wpd == 1).sum()) drop_0_cnt = int((_wpd == 0).sum()) else: drop_1_cnt = 0 drop_0_cnt = 0 print(f"will_price_drop 分类数量统计: 1(会降)={drop_1_cnt}, 0(不降)={drop_0_cnt}, 总数={total_cnt}") csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv') df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig') print("预测结果已追加") return df_predict