| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- import pandas as pd
- import numpy as np
- import bisect
- import gc
- from datetime import datetime, timedelta
- from sklearn.preprocessing import StandardScaler
- from config import city_to_country, vj_city_code_map, build_country_holidays
- COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
- def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=28):
- print(">>> 开始数据预处理")
- # 生成 城市对
- df_input['city_pair'] = (
- df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
- )
- df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
- df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
- # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
- cols = df_input.columns.tolist()
- # 删除已存在的几列(保证顺序正确)
- for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
- cols.remove(c)
- # 这几列插入到最前面
- df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
- # 转格式
- df_input['search_dep_time'] = pd.to_datetime(
- df_input['search_dep_time'],
- format='%Y%m%d',
- errors='coerce'
- ).dt.strftime('%Y-%m-%d')
- # 重命名起飞日期
- df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
-
- # 重命名航班号
- df_input.rename(
- columns={
- 'seg1_flight_number': 'flight_number_1',
- 'seg2_flight_number': 'flight_number_2'
- },
- inplace=True
- )
- # 分开填充
- df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
- df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
- # gid:基于指定字段的分组标记(整数)
- df_input['gid'] = (
- df_input
- .groupby(
- ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'], # 'baggage' 先不进分组
- sort=False
- )
- .ngroup()
- )
- # 做一下时间段裁剪, 保留起飞前480小时之内的
- df_input = df_input[df_input['hours_until_departure'] < 480].reset_index(drop=True)
- pass
- # 在 gid 与 baggage 内按时间降序
- df_input = df_input.sort_values(
- by=['gid', 'baggage', 'hours_until_departure'],
- ascending=[True, True, False]
- ).reset_index(drop=True)
- # 价格变化掩码
- g = df_input.groupby(['gid', 'baggage'])
- diff = g['adult_total_price'].transform('diff')
- change_mask = diff.abs() >= 5 # 变化太小的不计入
- # 价格变化次数
- df_input['price_change_times_total'] = (
- change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
- )
- # 上次发生变价的小时数
- last_change_hour = (
- df_input['hours_until_departure']
- .where(change_mask)
- .groupby([df_input['gid'], df_input['baggage']])
- .ffill() # 前向填充
- )
- # 当前距离上一次变价过去多少小时
- df_input['price_last_change_hours'] = (
- last_change_hour - df_input['hours_until_departure']
- ).fillna(0)
- pass
- # 想插入到 seats_remaining 前面的新列
- new_cols = [
- 'price_change_times_total',
- 'price_last_change_hours'
- ]
- # 当前所有列
- cols = df_input.columns.tolist()
- # 找到 seats_remaining 的位置
- idx = cols.index('seats_remaining')
- # 重新拼列顺序
- new_order = cols[:idx] + new_cols + cols[idx:]
- # 去重(防止列已经在原位置)
- new_order = list(dict.fromkeys(new_order))
- # 重新排列 DataFrame
- df_input = df_input[new_order]
- pass
- # 生成第一机场对
- df_input['airport_pair_1'] = (
- df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
- )
- # 删除原始第一机场码
- df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
- # 第一机场对 放到 seg1_dep_time 列的前面
- insert_idx = df_input.columns.get_loc('seg1_dep_time')
- airport_pair_1 = df_input.pop('airport_pair_1')
- df_input.insert(insert_idx, 'airport_pair_1', airport_pair_1)
- # 生成第二机场对(带缺失兜底)
- df_input['airport_pair_2'] = np.where(
- df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
- 'NA',
- df_input['seg2_dep_air_port'].astype(str) + "-" +
- df_input['seg2_arr_air_port'].astype(str)
- )
- # 删除原始第二机场码
- df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
- # 第二机场对 放到 seg2_dep_time 列的前面
- insert_idx = df_input.columns.get_loc('seg2_dep_time')
- airport_pair_2 = df_input.pop('airport_pair_2')
- df_input.insert(insert_idx, 'airport_pair_2', airport_pair_2)
-
- # 是否转乘
- df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
- insert_idx = df_input.columns.get_loc('flight_number_2')
- is_transfer = df_input.pop('is_transfer')
- df_input.insert(insert_idx, 'is_transfer', is_transfer)
- # 重命名起飞时刻与到达时刻
- df_input.rename(
- columns={
- 'seg1_dep_time': 'dep_time_1',
- 'seg1_arr_time': 'arr_time_1',
- 'seg2_dep_time': 'dep_time_2',
- 'seg2_arr_time': 'arr_time_2',
- },
- inplace=True
- )
-
- # 第一段飞行时长
- df_input['fly_duration_1'] = (
- (df_input['arr_time_1'] - df_input['dep_time_1'])
- .dt.total_seconds() / 3600
- ).round(2)
- # 第二段飞行时长(无转乘为 0)
- df_input['fly_duration_2'] = (
- (df_input['arr_time_2'] - df_input['dep_time_2'])
- .dt.total_seconds() / 3600
- ).fillna(0).round(2)
- # 总飞行时长
- df_input['fly_duration'] = (
- df_input['fly_duration_1'] + df_input['fly_duration_2']
- ).round(2)
- # 中转停留时长(无转乘为 0)
- df_input['stop_duration'] = (
- (df_input['dep_time_2'] - df_input['arr_time_1'])
- .dt.total_seconds() / 3600
- ).fillna(0).round(2)
- # 裁剪,防止负数
- # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
- # df_input[c] = df_input[c].clip(lower=0)
- # 和 is_transfer 逻辑保持一致
- # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
-
- # 一次性插到 is_filled 前面
- insert_before = 'is_filled'
- new_cols = [
- 'fly_duration_1',
- 'fly_duration_2',
- 'fly_duration',
- 'stop_duration'
- ]
- cols = df_input.columns.tolist()
- idx = cols.index(insert_before)
- # 删除旧位置
- cols = [c for c in cols if c not in new_cols]
- # 插入新位置(顺序保持)
- cols[idx:idx] = new_cols # python独有空切片插入法
- df_input = df_input[cols]
- # 一次生成多个字段
- dep_t1 = df_input['dep_time_1']
- # 几点起飞(0–23)
- df_input['flight_by_hour'] = dep_t1.dt.hour
- # 起飞日期几号(1–31)
- df_input['flight_by_day'] = dep_t1.dt.day
- # 起飞日期几月(1–12)
- df_input['flight_day_of_month'] = dep_t1.dt.month
- # 起飞日期周几(0=周一, 6=周日)
- df_input['flight_day_of_week'] = dep_t1.dt.weekday
- # 起飞日期季度(1–4)
- df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
- # 是否周末(周六 / 周日)
- df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
- # 找到对应的国家码
- df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
- df_input['arr_country'] = df_input['to_city_code'].map(city_to_country)
- # 整体出发时间 就是 dep_time_1
- df_input['global_dep_time'] = df_input['dep_time_1']
- # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
- df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
- # 出发日期在出发国家是否节假日
- df_input['dep_country_is_holiday'] = df_input.apply(
- lambda r: r['global_dep_time'].date()
- in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
- axis=1
- ).astype(int)
- # 到达日期在到达国家是否节假日
- df_input['arr_country_is_holiday'] = df_input.apply(
- lambda r: r['global_arr_time'].date()
- in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
- axis=1
- ).astype(int)
- # 在任一侧是否节假日
- df_input['flight_day_is_holiday'] = (
- df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
- .max(axis=1)
- )
- # 是否跨国航线
- df_input['is_cross_country'] = (
- df_input['dep_country'] != df_input['arr_country']
- ).astype(int)
- def days_to_next_holiday(country, cur_date):
- if pd.isna(country) or pd.isna(cur_date):
- return np.nan
- holidays = COUNTRY_HOLIDAYS.get(country)
- if not holidays:
- return np.nan
- # 找未来(含当天)的节假日,并排序
- future_holidays = sorted([d for d in holidays if d >= cur_date])
- if not future_holidays:
- return np.nan
- next_holiday = future_holidays[0] # 第一个未来节假日
- delta_days = (next_holiday - cur_date).days
- return delta_days
- df_input['days_to_holiday'] = df_input.apply(
- lambda r: days_to_next_holiday(
- r['dep_country'],
- r['update_hour'].date()
- ),
- axis=1
- )
-
- # 没有未来节假日的统一兜底
- # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
- # days_to_holiday 插在 update_hour 前面
- insert_idx = df_input.columns.get_loc('update_hour')
- days_to_holiday = df_input.pop('days_to_holiday')
- df_input.insert(insert_idx, 'days_to_holiday', days_to_holiday)
- # 制作targets
- print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}")
- target_lower_limit = 4
- target_upper_limit = current_n_hours
- mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
- df_targets = df_input.loc[mask_targets].copy()
- targets_amout = df_targets.shape[0]
- print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
- if targets_amout == 0:
- print(f">>> n_hours = {current_n_hours} 无有效数据,跳过")
- return pd.DataFrame()
- print(">>> 计算 price_at_n_hours")
- df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy()
- df_last = df_input_object.groupby('gid', observed=True).last().reset_index() # 一般落在起飞前28小时
-
- # 提取并重命名 price 列
- df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
- print(">>> price_at_n_hours计算完成,示例:")
- print(df_last_price_at_n_hours.head(5))
-
- # 计算降价信息
- print(">>> 计算降价信息")
- df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
- df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
- df_targets['price_dropped'] = (
- (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
- (df_targets['price_drop_amount'] >= 5) # 降幅不能太小
- )
- df_price_drops = df_targets[df_targets['price_dropped']].copy()
- price_drops_len = df_price_drops.shape[0]
- if price_drops_len == 0:
- print(f">>> n_hours = {current_n_hours} 无降价信息")
- # 创建包含指定列的空 DataFrame
- df_price_drop_info = pd.DataFrame({
- 'gid': pd.Series(dtype='int64'),
- 'first_drop_hours_until_departure': pd.Series(dtype='int64'),
- 'price_at_first_drop_hours': pd.Series(dtype='float64')
- })
- else:
- df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index() # 第一次发生的降价
- df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
- 'hours_until_departure': 'first_drop_hours_until_departure',
- 'adult_total_price': 'price_at_first_drop_hours'
- })
- print(">>> 降价信息计算完成,示例:")
- print(df_price_drop_info.head(5))
-
- # 合并信息
- df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
- df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
- df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
- df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0) # 区别
- df_gid_info['time_to_price_drop'] = df_gid_info['first_drop_hours_until_departure']
- df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0) # 区别
- del df_input_object
- del df_last
- del df_last_price_at_n_hours
- del df_targets
- del df_price_drops
- del df_price_drop_info
- gc.collect()
-
- # 将目标变量合并到输入数据中
- print(">>> 将目标变量信息合并到 df_input")
- df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
- # 使用 0 填充 NaN 值
- df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
- ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
- df_input = df_input.rename(columns={
- 'will_price_drop': 'target_will_price_drop',
- 'amount_of_price_drop': 'target_amount_of_drop',
- 'time_to_price_drop': 'target_time_to_drop'
- })
-
- # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
- # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
- # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
- # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
- # gid_count = df_min_price_by_gid.shape[0]
- # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
- # # 将最小价格 merge 到 df_inputs 中
- # print(">>> 将最小价格 merge 到输入数据中...")
- # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
- print(">>> 合并后 df_input 样例:")
- print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
-
- return df_input
|