import pandas as pd import numpy as np import gc import os def preprocess_data_simple(df_input, is_train=False, hourly_time=None): print(">>> 开始数据预处理") # 城市码映射成数字(不用) # 更新日期是周几 df_input['update_week'] = df_input['update_hour'].dt.dayofweek + 1 # gid:基于指定字段的分组标记(整数) df_input['gid'] = ( df_input .groupby( ['citypair', 'flight_numbers', 'from_date'], # 'baggage_weight' 先不进分组 sort=False ) .ngroup() ) # 在 gid 与 baggage_weight 内按时间降序 df_input = df_input.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) df_input = df_input[df_input['hours_until_departure'] <= 480] df_input = df_input[df_input['baggage_weight'] == 20] # 先保留20公斤行李的 # 在hours_until_departure 的末尾 保留到当前时刻的数据 if not is_train: df_input = df_input[df_input['update_hour'] <= hourly_time].copy() else: df_input = df_input.copy() # 训练集也 copy 一下保持一致性 df_input = df_input.reset_index(drop=True) # 价格变化最小量阈值 price_change_amount_threshold = 5 df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].diff() # 计算价格变化量 df_input['price_change_amount'] = ( df_input['_raw_price_diff'] .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False) .ffill() .fillna(0) .round(2) ) # 计算价格变化百分比(相对于上一时间点的变化率) df_input['price_change_percent'] = ( df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'] .pct_change() .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False) .ffill() .fillna(0) .round(4) ) # 第一步:标记价格变化段 df_input['price_change_segment'] = ( df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'] .apply(lambda s: (s != s.shift()).cumsum()) ) # 第二步:计算每个变化段内的持续时间 df_input['price_duration_hours'] = ( df_input.groupby(['gid', 'baggage_weight', 'price_change_segment'], group_keys=False) .cumcount() .add(1) ) # 可选:删除临时列 df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff']) # 训练过程 if is_train: df_target = df_input[(df_input['hours_until_departure'] >= 24) & (df_input['hours_until_departure'] <= 360)].copy() df_target = df_target.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) # 每条对应的前一条记录 prev_pct = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_percent'].shift(1) prev_amo = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'].shift(1) prev_dur = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_duration_hours'].shift(1) prev_price = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].shift(1) # 对于先升后降(先降再降)的分析 seg_start_mask = df_target['price_duration_hours'].eq(1) # 开始变价节点 drop_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] < 0) df_drop_nodes = df_target.loc[drop_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy() df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True) df_drop_nodes.rename(columns={'days_to_departure': 'drop_days_to_departure'}, inplace=True) df_drop_nodes.rename(columns={'update_hour': 'drop_update_hour'}, inplace=True) df_drop_nodes.rename(columns={'update_week': 'drop_update_week'}, inplace=True) df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy() df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy() df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy() df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy() df_drop_nodes = df_drop_nodes.reset_index(drop=True) flight_info_cols = [ 'citypair', 'flight_numbers', 'from_time', 'from_date', 'currency', ] flight_info_cols = [c for c in flight_info_cols if c in df_target.columns] df_gid_info = df_target[['gid', 'baggage_weight'] + flight_info_cols].drop_duplicates(subset=['gid', 'baggage_weight']).reset_index(drop=True) df_drop_nodes = df_drop_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left') drop_info_cols = [ 'drop_update_hour', 'drop_update_week', 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount', 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', ] # 按顺序排列 去掉gid df_drop_nodes = df_drop_nodes[flight_info_cols + ['baggage_weight'] + drop_info_cols] # 对于先升再升(先降再升)的分析 # seg_start_mask = df_target['price_duration_hours'].eq(1) rise_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] > 0) df_rise_nodes = df_target.loc[rise_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy() df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True) df_rise_nodes.rename(columns={'days_to_departure': 'rise_days_to_departure'}, inplace=True) df_rise_nodes.rename(columns={'update_hour': 'rise_update_hour'}, inplace=True) df_rise_nodes.rename(columns={'update_week': 'rise_update_week'}, inplace=True) df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy() df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy() df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes = df_rise_nodes.reset_index(drop=True) df_rise_nodes = df_rise_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left') rise_info_cols = [ 'rise_update_hour', 'rise_update_week', 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount', 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', ] df_rise_nodes = df_rise_nodes[flight_info_cols + ['baggage_weight'] + rise_info_cols] # 制作历史包络线 envelope_group = ['citypair', 'flight_numbers', 'from_date', 'baggage_weight'] idx_peak = df_input.groupby(envelope_group)['price_total'].idxmax() df_envelope = df_input.loc[idx_peak, envelope_group + [ 'from_time', 'price_total', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', ]].rename(columns={ 'price_total': 'peak_price', 'hours_until_departure': 'peak_hours', 'days_to_departure': 'peak_days', 'update_hour': 'peak_time', 'update_week': 'peak_week', }).reset_index(drop=True) del df_gid_info del df_target return df_input, df_drop_nodes, df_rise_nodes, df_envelope return df_input, None, None, None def predict_data_simple(df_input, city_pair, output_dir, predict_dir=".", pred_time_str=""): if df_input is None or df_input.empty: return pd.DataFrame() df_sorted = df_input.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False], ).reset_index(drop=True) df_sorted = df_sorted[ df_sorted['hours_until_departure'].between(24, 360) ].reset_index(drop=True) # 每个 gid baggage_weight 取 hours_until_departure 最小的一条 (当前小时) df_min_hours = ( df_sorted.drop_duplicates(subset=['gid', 'baggage_weight'], keep='last') .reset_index(drop=True) ) # 读历史降价场景 drop_info_csv_path = os.path.join(output_dir, f'{city_pair}_drop_info.csv') if os.path.exists(drop_info_csv_path): df_drop_nodes = pd.read_csv(drop_info_csv_path) else: df_drop_nodes = pd.DataFrame() # 读历史升价场景 rise_info_csv_path = os.path.join(output_dir, f'{city_pair}_rise_info.csv') if os.path.exists(rise_info_csv_path): df_rise_nodes = pd.read_csv(rise_info_csv_path) else: df_rise_nodes = pd.DataFrame() # 联合价格分布 # 统一初始化 df_min_hours['relative_position'] = np.nan if not df_drop_nodes.empty: df_drop_nodes['relative_position'] = np.nan if not df_rise_nodes.empty: df_rise_nodes['relative_position'] = np.nan parts = [] # 当前待预测 if not df_min_hours.empty and 'price_total' in df_min_hours.columns: cur = df_min_hours[['price_total']].copy() cur['price'] = pd.to_numeric(cur['price_total'], errors='coerce') cur['source'] = 'min' cur['row_id'] = cur.index parts.append(cur[['price', 'source', 'row_id']]) # 历史降价 if not df_drop_nodes.empty and 'high_price_amount' in df_drop_nodes.columns: drop = df_drop_nodes[['high_price_amount']].copy() drop['price'] = pd.to_numeric(drop['high_price_amount'], errors='coerce') drop['source'] = 'drop' drop['row_id'] = drop.index parts.append(drop[['price', 'source', 'row_id']]) # 历史升价 if not df_rise_nodes.empty and 'prev_rise_amount' in df_rise_nodes.columns: rise = df_rise_nodes[['prev_rise_amount']].copy() rise['price'] = pd.to_numeric(rise['prev_rise_amount'], errors='coerce') rise['source'] = 'rise' rise['row_id'] = rise.index parts.append(rise[['price', 'source', 'row_id']]) if parts: all_prices = pd.concat(parts, ignore_index=True) all_prices = all_prices.dropna(subset=['price']).reset_index(drop=True) # 计算价格百分位 dense_rank = all_prices['price'].rank(method='dense') max_rank = dense_rank.max() if pd.notna(max_rank) and max_rank > 1: all_prices['relative_position'] = (dense_rank - 1) / (max_rank - 1) else: all_prices['relative_position'] = 1.0 all_prices['relative_position'] = all_prices['relative_position'].round(4) # 回填到三个表 m = all_prices['source'] == 'min' df_min_hours.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values if not df_drop_nodes.empty: m = all_prices['source'] == 'drop' df_drop_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values if not df_rise_nodes.empty: m = all_prices['source'] == 'rise' df_rise_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values pass