import pandas as pd import numpy as np import bisect import gc from datetime import datetime, timedelta from sklearn.preprocessing import StandardScaler from config import city_to_country, vj_city_code_map, vi_flight_number_map, build_country_holidays from utils import insert_df_col COUNTRY_HOLIDAYS = build_country_holidays(city_to_country) def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=28): print(">>> 开始数据预处理") # 生成 城市对 df_input['city_pair'] = ( df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str) ) # 城市码映射成数字 df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map) df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map) missing_from = ( df_input.loc[df_input['from_city_num'].isna(), 'from_city_code'] .unique() ) missing_to = ( df_input.loc[df_input['to_city_num'].isna(), 'to_city_code'] .unique() ) if missing_from: print("未映射的 from_city:", missing_from) if missing_to: print("未映射的 to_city:", missing_to) # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列 cols = df_input.columns.tolist() # 删除已存在的几列(保证顺序正确) for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']: cols.remove(c) # 这几列插入到最前面 df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols] pass # 转格式 df_input['search_dep_time'] = pd.to_datetime( df_input['search_dep_time'], format='%Y%m%d', errors='coerce' ).dt.strftime('%Y-%m-%d') # 重命名起飞日期 df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True) # 重命名航班号 df_input.rename( columns={ 'seg1_flight_number': 'flight_number_1', 'seg2_flight_number': 'flight_number_2' }, inplace=True ) # 分开填充 df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ') df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ') # 航班号转数字 df_input['flight_1_num'] = df_input['flight_number_1'].map(vi_flight_number_map) df_input['flight_2_num'] = df_input['flight_number_2'].map(vi_flight_number_map) missing_flight_1 = ( df_input.loc[df_input['flight_1_num'].isna(), 'flight_number_1'] .unique() ) missing_flight_2 = ( df_input.loc[df_input['flight_2_num'].isna(), 'flight_number_2'] .unique() ) if missing_flight_1: print("未映射的 flight_1:", missing_flight_1) if missing_flight_2: print("未映射的 flight_2:", missing_flight_2) # flight_1_num 放在 seg1_dep_air_port 之前 insert_df_col(df_input, 'flight_1_num', 'seg1_dep_air_port') # flight_2_num 放在 seg2_dep_air_port 之前 insert_df_col(df_input, 'flight_2_num', 'seg2_dep_air_port') df_input['baggage_level'] = (df_input['baggage'] == 30).astype(int) # 30--> 1 20--> 0 # baggage_level 放在 flight_number_2 之前 insert_df_col(df_input, 'baggage_level', 'flight_number_2') df_input['Adult_Total_Price'] = df_input['adult_total_price'] # Adult_Total_Price 放在 seats_remaining 之前 保存缩放前的原始值 insert_df_col(df_input, 'Adult_Total_Price', 'seats_remaining') df_input['Hours_Until_Departure'] = df_input['hours_until_departure'] # Hours_Until_Departure 放在 days_to_departure 之前 保存缩放前的原始值 insert_df_col(df_input, 'Hours_Until_Departure', 'days_to_departure') pass # gid:基于指定字段的分组标记(整数) df_input['gid'] = ( df_input .groupby( ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组 sort=False ) .ngroup() ) # 做一下时间段裁剪, 保留起飞前480小时之内的 df_input = df_input[df_input['hours_until_departure'] < 480].reset_index(drop=True) pass # 在 gid 与 baggage 内按时间降序 df_input = df_input.sort_values( by=['gid', 'baggage', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) # 价格变化掩码 g = df_input.groupby(['gid', 'baggage']) diff = g['adult_total_price'].transform('diff') change_mask = diff.abs() >= 5 # 变化太小的不计入 # 价格变化次数 df_input['price_change_times_total'] = ( change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum() ) # 上次发生变价的小时数 last_change_hour = ( df_input['hours_until_departure'] .where(change_mask) .groupby([df_input['gid'], df_input['baggage']]) .ffill() # 前向填充 ) # 当前距离上一次变价过去多少小时 df_input['price_last_change_hours'] = ( last_change_hour - df_input['hours_until_departure'] ).fillna(0) pass # 想插入到 seats_remaining 前面的新列 new_cols = [ 'price_change_times_total', 'price_last_change_hours' ] # 当前所有列 cols = df_input.columns.tolist() # 找到 seats_remaining 的位置 idx = cols.index('seats_remaining') # 重新拼列顺序 new_order = cols[:idx] + new_cols + cols[idx:] # 去重(防止列已经在原位置) new_order = list(dict.fromkeys(new_order)) # 重新排列 DataFrame df_input = df_input[new_order] pass # 生成第一机场对 df_input['airport_pair_1'] = ( df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str) ) # 删除原始第一机场码 df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True) # 第一机场对 放到 seg1_dep_time 列的前面 insert_df_col(df_input, 'airport_pair_1', 'seg1_dep_time') # 生成第二机场对(带缺失兜底) df_input['airport_pair_2'] = np.where( df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(), 'NA', df_input['seg2_dep_air_port'].astype(str) + "-" + df_input['seg2_arr_air_port'].astype(str) ) # 删除原始第二机场码 df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True) # 第二机场对 放到 seg2_dep_time 列的前面 insert_df_col(df_input, 'airport_pair_2', 'seg2_dep_time') # 是否转乘 df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1) # 是否转乘 放到 flight_number_2 列的前面 insert_df_col(df_input, 'is_transfer', 'flight_number_2') # 重命名起飞时刻与到达时刻 df_input.rename( columns={ 'seg1_dep_time': 'dep_time_1', 'seg1_arr_time': 'arr_time_1', 'seg2_dep_time': 'dep_time_2', 'seg2_arr_time': 'arr_time_2', }, inplace=True ) # 第一段飞行时长 df_input['fly_duration_1'] = ( (df_input['arr_time_1'] - df_input['dep_time_1']) .dt.total_seconds() / 3600 ).round(2) # 第二段飞行时长(无转乘为 0) df_input['fly_duration_2'] = ( (df_input['arr_time_2'] - df_input['dep_time_2']) .dt.total_seconds() / 3600 ).fillna(0).round(2) # 总飞行时长 df_input['fly_duration'] = ( df_input['fly_duration_1'] + df_input['fly_duration_2'] ).round(2) # 中转停留时长(无转乘为 0) df_input['stop_duration'] = ( (df_input['dep_time_2'] - df_input['arr_time_1']) .dt.total_seconds() / 3600 ).fillna(0).round(2) # 裁剪,防止负数 # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']: # df_input[c] = df_input[c].clip(lower=0) # 和 is_transfer 逻辑保持一致 # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0 # 一次性插到 is_filled 前面 insert_before = 'is_filled' new_cols = [ 'fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration' ] cols = df_input.columns.tolist() idx = cols.index(insert_before) # 删除旧位置 cols = [c for c in cols if c not in new_cols] # 插入新位置(顺序保持) cols[idx:idx] = new_cols # python独有空切片插入法 df_input = df_input[cols] # 一次生成多个字段 dep_t1 = df_input['dep_time_1'] # 几点起飞(0–23) df_input['flight_by_hour'] = dep_t1.dt.hour # 起飞日期几号(1–31) df_input['flight_by_day'] = dep_t1.dt.day # 起飞日期几月(1–12) df_input['flight_day_of_month'] = dep_t1.dt.month # 起飞日期周几(0=周一, 6=周日) df_input['flight_day_of_week'] = dep_t1.dt.weekday # 起飞日期季度(1–4) df_input['flight_day_of_quarter'] = dep_t1.dt.quarter # 是否周末(周六 / 周日) df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int) # 找到对应的国家码 df_input['dep_country'] = df_input['from_city_code'].map(city_to_country) df_input['arr_country'] = df_input['to_city_code'].map(city_to_country) # 整体出发时间 就是 dep_time_1 df_input['global_dep_time'] = df_input['dep_time_1'] # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1 df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1']) # 出发日期在出发国家是否节假日 df_input['dep_country_is_holiday'] = df_input.apply( lambda r: r['global_dep_time'].date() in COUNTRY_HOLIDAYS.get(r['dep_country'], set()), axis=1 ).astype(int) # 到达日期在到达国家是否节假日 df_input['arr_country_is_holiday'] = df_input.apply( lambda r: r['global_arr_time'].date() in COUNTRY_HOLIDAYS.get(r['arr_country'], set()), axis=1 ).astype(int) # 在任一侧是否节假日 df_input['any_country_is_holiday'] = ( df_input[['dep_country_is_holiday', 'arr_country_is_holiday']] .max(axis=1) ) # 是否跨国航线 df_input['is_cross_country'] = ( df_input['dep_country'] != df_input['arr_country'] ).astype(int) def days_to_next_holiday(country, cur_date): if pd.isna(country) or pd.isna(cur_date): return np.nan holidays = COUNTRY_HOLIDAYS.get(country) if not holidays: return np.nan # 找未来(含当天)的节假日,并排序 future_holidays = sorted([d for d in holidays if d >= cur_date]) if not future_holidays: return np.nan next_holiday = future_holidays[0] # 第一个未来节假日 delta_days = (next_holiday - cur_date).days return delta_days df_input['days_to_holiday'] = df_input.apply( lambda r: days_to_next_holiday( r['dep_country'], r['update_hour'].date() ), axis=1 ) # 没有未来节假日的统一兜底 # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999) # days_to_holiday 插在 update_hour 前面 insert_df_col(df_input, 'days_to_holiday', 'update_hour') # 制作targets print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}") target_lower_limit = 4 target_upper_limit = current_n_hours mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30) df_targets = df_input.loc[mask_targets].copy() targets_amout = df_targets.shape[0] print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})") if targets_amout == 0: print(f">>> n_hours = {current_n_hours} 无有效数据,跳过") return pd.DataFrame() print(">>> 计算 price_at_n_hours") df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy() df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前28小时 # 提取并重命名 price 列 df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'}) print(">>> price_at_n_hours计算完成,示例:") print(df_last_price_at_n_hours.head(5)) # 计算降价信息 print(">>> 计算降价信息") df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left') df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price'] df_targets['price_dropped'] = ( (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) & (df_targets['price_drop_amount'] >= 5) # 降幅不能太小 ) df_price_drops = df_targets[df_targets['price_dropped']].copy() price_drops_len = df_price_drops.shape[0] if price_drops_len == 0: print(f">>> n_hours = {current_n_hours} 无降价信息") # 创建包含指定列的空 DataFrame df_price_drop_info = pd.DataFrame({ 'gid': pd.Series(dtype='int64'), 'first_drop_hours_until_departure': pd.Series(dtype='int64'), 'price_at_first_drop_hours': pd.Series(dtype='float64') }) else: df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价 df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={ 'hours_until_departure': 'first_drop_hours_until_departure', 'adult_total_price': 'price_at_first_drop_hours' }) print(">>> 降价信息计算完成,示例:") print(df_price_drop_info.head(5)) # 合并信息 df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left') df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int) df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours'] df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别 df_gid_info['time_to_price_drop'] = df_gid_info['first_drop_hours_until_departure'] df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别 del df_input_object del df_last del df_last_price_at_n_hours del df_targets del df_price_drops del df_price_drop_info gc.collect() # 将目标变量合并到输入数据中 print(">>> 将目标变量信息合并到 df_input") df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left') # 使用 0 填充 NaN 值 df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[ ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0) df_input = df_input.rename(columns={ 'will_price_drop': 'target_will_price_drop', 'amount_of_price_drop': 'target_amount_of_drop', 'time_to_price_drop': 'target_time_to_drop' }) # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值 # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...") # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index() # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'}) # gid_count = df_min_price_by_gid.shape[0] # print(f">>> 计算完成,共 {gid_count} 个 gid 分组") # # 将最小价格 merge 到 df_inputs 中 # print(">>> 将最小价格 merge 到输入数据中...") # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left') print(">>> 合并后 df_input 样例:") print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5)) # 按顺序排列 order_columns = [ "city_pair", "from_city_code", "from_city_num", "to_city_code", "to_city_num", "flight_day", "seats_remaining", "baggage", "baggage_level", "price_change_times_total", "price_last_change_hours", "adult_total_price", "Adult_Total_Price", "target_will_price_drop", "target_time_to_drop", "days_to_departure", "days_to_holiday", "hours_until_departure", "Hours_Until_Departure", "update_hour", "gid", "flight_number_1", "flight_1_num", "airport_pair_1", "dep_time_1", "arr_time_1", "fly_duration_1", "flight_by_hour", "flight_by_day", "flight_day_of_month", "flight_day_of_week", "flight_day_of_quarter", "flight_day_is_weekend", "is_transfer", "flight_number_2", "flight_2_num", "airport_pair_2", "dep_time_2", "arr_time_2", "fly_duration_2", "fly_duration", "stop_duration", "global_dep_time", "dep_country", "dep_country_is_holiday", "is_cross_country", "global_arr_time", "arr_country", "arr_country_is_holiday", "any_country_is_holiday", ] df_input = df_input[order_columns] return df_input def standardization(df, feature_scaler, target_scaler, is_training=True, is_test=False): print(">>> 开始标准化处理") # 准备走标准化的特征 scaler_features = ['adult_total_price', 'fly_duration', 'stop_duration'] if is_training: print(">>> 特征数据标准化开始") if feature_scaler is None: feature_scaler = StandardScaler() if not is_test: feature_scaler.fit(df[scaler_features]) df[scaler_features] = feature_scaler.transform(df[scaler_features]) print(">>> 特征数据标准化完成") else: df[scaler_features] = feature_scaler.transform(df[scaler_features]) print(">>> 预测模式下特征标准化处理完成") # 准备走归一化的特征 # 事先定义好每个特征的合理范围 fixed_ranges = { 'hours_until_departure': (0, 480), # 0-20天 'from_city_num': (0, 38), 'to_city_num': (0, 38), 'flight_1_num': (0, 341), 'flight_2_num': (0, 341), 'seats_remaining': (1, 5), 'price_change_times_total': (0, 30), # 假设价格变更次数不会超过30次 'price_last_change_hours': (0, 480), 'days_to_departure': (0, 30), 'days_to_holiday': (0, 120), # 最长的越南节假日间隔120天 'flight_by_hour': (0, 23), 'flight_by_day': (1, 31), 'flight_day_of_month': (1, 12), 'flight_day_of_week': (0, 6), 'flight_day_of_quarter': (1, 4), } normal_features = list(fixed_ranges.keys()) print(">>> 归一化特征列: ", normal_features) print(">>> 基于固定范围的特征数据归一化开始") for col in normal_features: if col in df.columns: # 核心归一化公式: (x - min) / (max - min) col_min, col_max = fixed_ranges[col] df[col] = (df[col] - col_min) / (col_max - col_min) # 添加裁剪,将超出范围的值强制限制在[0,1]区间 df[col] = df[col].clip(0, 1) print(">>> 基于固定范围的特征数据归一化完成") return df, feature_scaler, target_scaler