import pandas as pd import numpy as np import gc import os def preprocess_data_simple(df_input, is_train=False, hourly_time=None): print(">>> 开始数据预处理") # 城市码映射成数字(不用) # 更新日期是周几 df_input['update_week'] = df_input['update_hour'].dt.dayofweek + 1 # gid:基于指定字段的分组标记(整数) df_input['gid'] = ( df_input .groupby( ['citypair', 'flight_numbers', 'from_date'], # 'baggage_weight' 先不进分组 sort=False ) .ngroup() ) # 在 gid 与 baggage_weight 内按时间降序 df_input = df_input.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) df_input = df_input[df_input['hours_until_departure'] <= 480] df_input = df_input[df_input['baggage_weight'] == 0] # 先保留0公斤行李的 # 在hours_until_departure 的末尾 保留到当前时刻的数据 if not is_train: df_input = df_input[df_input['update_hour'] <= hourly_time].copy() else: df_input = df_input.copy() # 训练集也 copy 一下保持一致性 df_input = df_input.reset_index(drop=True) # 价格变化最小量阈值 price_change_amount_threshold = 1 df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].diff() # 计算价格变化量 df_input['price_change_amount'] = ( df_input['_raw_price_diff'] .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False) .ffill() .fillna(0) .round(2) ) # 计算价格变化百分比(相对于上一时间点的变化率) df_input['price_change_percent'] = ( df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'] .pct_change() .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0) .replace(0, np.nan) .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False) .ffill() .fillna(0) .round(4) ) # 第一步:标记价格变化段(按“是否发生新的实际变价事件”切段) # 这样即使连续两次变价金额相同(如 -50, -50),也会分到不同段 _price_change_event = df_input['_raw_price_diff'].abs().ge(price_change_amount_threshold) df_input['price_change_segment'] = ( _price_change_event .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False) .cumsum() ) # 第二步:计算每个变化段内的持续时间 df_input['price_duration_hours'] = ( df_input.groupby(['gid', 'baggage_weight', 'price_change_segment'], group_keys=False) .cumcount() .add(1) ) # 可选:删除临时列 df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff']) # 训练过程 if is_train: df_target = df_input[(df_input['hours_until_departure'] >= 48) & (df_input['hours_until_departure'] <= 384)].copy() df_target = df_target.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False] ).reset_index(drop=True) # 每条对应的前一条记录 prev_pct = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_percent'].shift(1) prev_amo = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'].shift(1) prev_dur = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_duration_hours'].shift(1) prev_price = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].shift(1) prev_cabin = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['cabins'].shift(1) # 对于先升后降(先降再降)的分析 seg_start_mask = df_target['price_duration_hours'].eq(1) # 开始变价节点 # 正例库仅保留24小时内发生的降价:上一价格段持续时长需<=24h prev_pct_num = pd.to_numeric(prev_pct, errors='coerce') drop_mask = ( seg_start_mask & prev_pct_num.notna() & (df_target['price_change_percent'] < 0) & prev_dur.le(24) ) df_drop_nodes = df_target.loc[drop_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', 'cabins']].copy() df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True) df_drop_nodes.rename(columns={'days_to_departure': 'drop_days_to_departure'}, inplace=True) df_drop_nodes.rename(columns={'update_hour': 'drop_update_hour'}, inplace=True) df_drop_nodes.rename(columns={'update_week': 'drop_update_week'}, inplace=True) df_drop_nodes.rename(columns={'cabins': 'drop_cabins'}, inplace=True) df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy() df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy() df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy() df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy() df_drop_nodes['high_price_cabins'] = prev_cabin.loc[drop_mask].astype(str) df_drop_nodes = df_drop_nodes.reset_index(drop=True) flight_info_cols = [ 'gid', 'baggage_weight', 'citypair', 'flight_numbers', 'from_time', 'from_date', 'currency', ] flight_info_cols = [c for c in flight_info_cols if c in df_target.columns] df_gid_info = df_target[flight_info_cols].drop_duplicates(subset=['gid', 'baggage_weight']).reset_index(drop=True) df_drop_nodes = df_drop_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left') drop_info_cols = [ 'drop_update_hour', 'drop_update_week', 'drop_cabins', 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount', 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'high_price_cabins', ] # 按顺序排列 保留gid df_drop_nodes = df_drop_nodes[flight_info_cols + drop_info_cols] df_drop_nodes['start_hours_until_departure'] = (df_drop_nodes['drop_hours_until_departure'] + df_drop_nodes['high_price_duration_hours']).round().astype('Int64') df_drop_nodes = df_drop_nodes[df_drop_nodes['drop_hours_until_departure'] <= 360] df_drop_nodes = df_drop_nodes[df_drop_nodes['start_hours_until_departure'] >= 72] # 反例库:所有有效节点(不限升价)中,未来24小时内未发生降价 # seg_start_mask = df_target['price_duration_hours'].eq(1) # rise_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] > 0) prev_pct_num = pd.to_numeric(prev_pct, errors='coerce') valid_mask = seg_start_mask & prev_pct_num.notna() curr_pct = pd.to_numeric(df_target['price_change_percent'], errors='coerce') prev_dur_num = pd.to_numeric(prev_dur, errors='coerce') pos_case_mask = curr_pct.ge(0) neg_case_mask = curr_pct.lt(0) & prev_dur_num.gt(24) rise_mask = valid_mask & (pos_case_mask | neg_case_mask) df_rise_nodes = df_target.loc[rise_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', 'cabins']].copy() df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True) df_rise_nodes.rename(columns={'days_to_departure': 'rise_days_to_departure'}, inplace=True) df_rise_nodes.rename(columns={'update_hour': 'rise_update_hour'}, inplace=True) df_rise_nodes.rename(columns={'update_week': 'rise_update_week'}, inplace=True) df_rise_nodes.rename(columns={'cabins': 'rise_cabins'}, inplace=True) df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy() df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy() df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy() df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy() df_rise_nodes['prev_rise_cabins'] = prev_cabin.loc[rise_mask].astype(str) df_rise_nodes = df_rise_nodes.reset_index(drop=True) df_rise_nodes = df_rise_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left') rise_info_cols = [ 'rise_update_hour', 'rise_update_week', 'rise_cabins', 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount', 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_cabins' ] df_rise_nodes = df_rise_nodes[flight_info_cols + rise_info_cols] df_rise_nodes['start_hours_until_departure'] = (df_rise_nodes['rise_hours_until_departure'] + df_rise_nodes['prev_rise_duration_hours']).round().astype('Int64') df_rise_nodes = df_rise_nodes[df_rise_nodes['rise_hours_until_departure'] <= 360] df_rise_nodes = df_rise_nodes[df_rise_nodes['start_hours_until_departure'] >= 72] # 制作历史包络线 envelope_group = ['citypair', 'flight_numbers', 'from_date', 'baggage_weight'] idx_peak = df_target.groupby(envelope_group)['price_total'].idxmax() df_envelope = df_target.loc[idx_peak, envelope_group + [ 'from_time', 'price_total', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week', ]].rename(columns={ 'price_total': 'peak_price', 'hours_until_departure': 'peak_hours', 'days_to_departure': 'peak_days', 'update_hour': 'peak_time', 'update_week': 'peak_week', }).reset_index(drop=True) del df_gid_info del df_target return df_input, df_drop_nodes, df_rise_nodes, df_envelope return df_input, None, None, None def predict_data_simple(df_input, city_pair, object_dir, predict_dir=".", pred_time_str=""): if df_input is None or df_input.empty: return pd.DataFrame() df_sorted = df_input.sort_values( by=['gid', 'baggage_weight', 'hours_until_departure'], ascending=[True, True, False], ).reset_index(drop=True) df_sorted = df_sorted[ df_sorted['hours_until_departure'].between(72, 360) ].reset_index(drop=True) # 每个 gid baggage_weight 取 hours_until_departure 最小的一条 (当前小时) df_min_hours = ( df_sorted.drop_duplicates(subset=['gid', 'baggage_weight'], keep='last') .reset_index(drop=True) ) # 余票不能太少 df_min_hours = df_min_hours[(df_min_hours['ticket_amount'] >= 2)].reset_index(drop=True) # 读历史降价场景 drop_info_csv_path = os.path.join(object_dir, f'{city_pair}_drop_info.csv') if os.path.exists(drop_info_csv_path): df_drop_nodes = pd.read_csv(drop_info_csv_path) else: df_drop_nodes = pd.DataFrame() # 读历史升价场景 rise_info_csv_path = os.path.join(object_dir, f'{city_pair}_rise_info.csv') if os.path.exists(rise_info_csv_path): df_rise_nodes = pd.read_csv(rise_info_csv_path) else: df_rise_nodes = pd.DataFrame() # 联合价格分布 ========================================================== # 统一初始化 df_min_hours['relative_position'] = np.nan if not df_drop_nodes.empty: df_drop_nodes['relative_position'] = np.nan if not df_rise_nodes.empty: df_rise_nodes['relative_position'] = np.nan parts = [] # 当前待预测 if not df_min_hours.empty and 'price_total' in df_min_hours.columns: cur = df_min_hours[['price_total']].copy() cur['price'] = pd.to_numeric(cur['price_total'], errors='coerce') cur['source'] = 'min' cur['row_id'] = cur.index parts.append(cur[['price', 'source', 'row_id']]) # 历史降价 if not df_drop_nodes.empty and 'high_price_amount' in df_drop_nodes.columns: drop = df_drop_nodes[['high_price_amount']].copy() drop['price'] = pd.to_numeric(drop['high_price_amount'], errors='coerce') drop['source'] = 'drop' drop['row_id'] = drop.index parts.append(drop[['price', 'source', 'row_id']]) # 历史升价 if not df_rise_nodes.empty and 'prev_rise_amount' in df_rise_nodes.columns: rise = df_rise_nodes[['prev_rise_amount']].copy() rise['price'] = pd.to_numeric(rise['prev_rise_amount'], errors='coerce') rise['source'] = 'rise' rise['row_id'] = rise.index parts.append(rise[['price', 'source', 'row_id']]) if parts: all_prices = pd.concat(parts, ignore_index=True) all_prices = all_prices.dropna(subset=['price']).reset_index(drop=True) # 计算价格百分位 dense_rank = all_prices['price'].rank(method='dense') max_rank = dense_rank.max() if pd.notna(max_rank) and max_rank > 1: all_prices['relative_position'] = (dense_rank - 1) / (max_rank - 1) else: all_prices['relative_position'] = 1.0 all_prices['relative_position'] = all_prices['relative_position'].round(4) # 回填到三个表 m = all_prices['source'] == 'min' df_min_hours.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values if not df_drop_nodes.empty: m = all_prices['source'] == 'drop' df_drop_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values if not df_rise_nodes.empty: m = all_prices['source'] == 'rise' df_rise_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values # ==================================================================================================== # print(">>> 构建跨航班日价格包络线") # flight_key = ['citypair', 'flight_numbers', 'baggage_weight'] # day_key = flight_key + ['from_date'] # # 1. 历史侧:加载训练阶段的峰值数据 # envelope_csv_path = os.path.join(object_dir, f'{city_pair}_envelope_info.csv') # if os.path.exists(envelope_csv_path): # df_hist = pd.read_csv(envelope_csv_path) # df_hist = df_hist[day_key + ['peak_price', 'peak_hours']] # df_hist['source'] = 'hist' # else: # df_hist = pd.DataFrame() # # 2. 未来侧:当前在售价格 # # df_future = df_min_hours[day_key + ['price_total', 'hours_until_departure']].copy().rename( # # columns={'price_total': 'peak_price', 'hours_until_departure': 'peak_hours'} # # ) # # df_future['source'] = 'future' # df_future = pd.DataFrame() # # 3. 合并包络线数据点 # df_envelope_all = pd.concat( # [x for x in [df_hist, df_future] if not x.empty], ignore_index=True # ).drop_duplicates(subset=day_key, keep='last') # # 4. 包络线统计 + 找高点起飞日 # df_envelope_agg = df_envelope_all.groupby(flight_key).agg( # envelope_max=('peak_price', 'max'), # 峰值最大 # envelope_min=('peak_price', 'min'), # 峰值最小 # envelope_mean=('peak_price', 'mean'), # 峰值平均 # envelope_count=('peak_price', 'count'), # 峰值统计总数 # envelope_avg_peak_hours=('peak_hours', 'mean'), # 峰值发生的距离起飞小时数, 做一下平均 # ).reset_index() # # 对数值列保留两位小数 # df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']] = df_envelope_agg[['envelope_mean', 'envelope_avg_peak_hours']].round(2) # idx_top = df_envelope_all.groupby(flight_key)['peak_price'].idxmax() # df_top = df_envelope_all.loc[idx_top, flight_key + ['from_date', 'peak_price', 'peak_hours']].rename( # columns={'from_date': 'target_flight_day', 'peak_price': 'target_price', 'peak_hours': 'target_peak_hours'} # ) # df_envelope_agg = df_envelope_agg.merge(df_top, on=flight_key, how='left') # # 5. 合并到 df_min_hours # df_min_hours = df_min_hours.merge(df_envelope_agg, on=flight_key, how='left') # price_range = (df_min_hours['envelope_max'] - df_min_hours['envelope_min']).replace(0, 1) # 计算当前价格在包络区间的百分位 # df_min_hours['envelope_position'] = ( # (df_min_hours['price_total'] - df_min_hours['envelope_min']) / price_range # ).clip(0, 1).round(4) # # df_min_hours['is_envelope_peak'] = (df_min_hours['envelope_position'] >= 0.75).astype(int) # 0.95 -> 0.75 # df_min_hours['is_target_day'] = (df_min_hours['from_date'] == df_min_hours['target_flight_day']).astype(int) # 综合评分阈值:大于阈值的都认为值得投放 relative_position_threshold = 0.4 df_min_hours['is_good_target'] = (df_min_hours['relative_position'] >= relative_position_threshold).astype(int) total_cnt_before = len(df_min_hours) # 记录下过滤前的总数 df_min_hours = df_min_hours[(df_min_hours['is_good_target'] == 1)].reset_index(drop=True) # 保留值得投放的 total_cnt_after = len(df_min_hours) # 记录下过滤后的总数 # ===================================================================== df_min_hours['simple_will_price_drop'] = 0 # df_min_hours['simple_drop_in_hours'] = 0 df_min_hours['simple_drop_in_hours_prob'] = 0.0 df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知 df_min_hours['flag_dist'] = '' df_min_hours['drop_price_change_upper'] = 0.0 df_min_hours['drop_price_change_lower'] = 0.0 df_min_hours['drop_price_sample_size'] = 0 df_min_hours['rise_price_change_upper'] = 0.0 df_min_hours['rise_price_change_lower'] = 0.0 df_min_hours['rise_price_sample_size'] = 0 # 这个阈值取多少? pct_threshold = 0.1 pct_threshold_1 = 0.1 for idx, row in df_min_hours.iterrows(): city_pair = row['citypair'] flight_numbers = row['flight_numbers'] baggage_weight = row['baggage_weight'] from_date = row['from_date'] if flight_numbers == "UO235" and from_date == "2026-04-25": # 调试时用 pass days_to_departure = row['days_to_departure'] hours_until_departure = row['hours_until_departure'] price_change_percent = row['price_change_percent'] price_change_amount = row['price_change_amount'] price_duration_hours = row['price_duration_hours'] price_amount = row['price_total'] cabins = row['cabins'] length_drop = 0 length_rise = 0 # 针对历史上发生的 >降价 if not df_drop_nodes.empty: # 对准航线 航班号 行李配额 df_drop_nodes_part = df_drop_nodes[ (df_drop_nodes['citypair'] == city_pair) & (df_drop_nodes['flight_numbers'] == flight_numbers) & (df_drop_nodes['baggage_weight'] == baggage_weight) ] # 降价前 增量阈值、当前阈值 的匹配 if not df_drop_nodes_part.empty and pd.notna(price_change_percent): pct_base = float(price_change_percent) pct_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_percent'], errors='coerce') df_drop_gap = df_drop_nodes_part.loc[ pct_vals.notna(), ['drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount', 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'high_price_cabins', 'relative_position' ] ].copy() df_drop_gap['pct_gap'] = (pct_vals.loc[pct_vals.notna()] - pct_base) df_drop_gap['pct_abs_gap'] = df_drop_gap['pct_gap'].abs() price_base = pd.to_numeric(price_amount, errors='coerce') high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce') df_drop_gap['price_gap'] = high_price_vals - price_base df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs() df_drop_gap = df_drop_gap.sort_values(['price_abs_gap', 'pct_abs_gap'], ascending=[True, True]) same_sign_mask = ( np.sign(pd.to_numeric(df_drop_gap['high_price_change_percent'], errors='coerce')) == np.sign(pct_base) ) df_match = df_drop_gap[ (df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 5.0) & same_sign_mask & (df_drop_gap['high_price_cabins'] == cabins) ].copy() # 历史上出现的极近似的增长(下降)幅度后的降价场景 if not df_match.empty: dur_base = pd.to_numeric(price_duration_hours, errors='coerce') hud_base = pd.to_numeric(hours_until_departure, errors='coerce') dtd_base = pd.to_numeric(days_to_departure, errors='coerce') if pd.notna(dur_base) and pd.notna(dtd_base) and pd.notna(hud_base): df_match_chk = df_match.copy() # drop_dtd_vals = pd.to_numeric(df_match_chk['drop_days_to_departure'], errors='coerce') # df_match_chk = df_match_chk.loc[drop_dtd_vals.notna()].copy() # df_match_chk = df_match_chk.loc[(drop_dtd_vals.loc[drop_dtd_vals.notna()] - float(dtd_base)).abs() <= 3].copy() # drop_hud_vals = pd.to_numeric(df_match_chk['drop_hours_until_departure'], errors='coerce') # df_match_chk = df_match_chk.loc[drop_hud_vals.notna()].copy() # df_match_chk = df_match_chk.loc[(float(hud_base) - drop_hud_vals.loc[drop_hud_vals.notna()]) >= -24].copy() # 正例收紧 dur_num_chk = pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') dur_delta = dur_num_chk - float(dur_base) df_match_chk = df_match_chk.assign(dur_delta=dur_delta) df_match_chk = df_match_chk.loc[df_match_chk['dur_delta'].notna()].copy() df_match_chk = df_match_chk.loc[df_match_chk['dur_delta'].abs() <= 72].copy() # 所有条件都对的上 if not df_match_chk.empty: length_drop = df_match_chk.shape[0] df_min_hours.loc[idx, 'drop_price_sample_size'] = length_drop drop_price_change_upper = df_match_chk['drop_price_change_amount'].max() # 降价上限 drop_price_change_lower = df_match_chk['drop_price_change_amount'].min() # 降价下限 df_min_hours.loc[idx, 'drop_price_change_upper'] = round(drop_price_change_upper, 2) df_min_hours.loc[idx, 'drop_price_change_lower'] = round(drop_price_change_lower, 2) # remaining_hours = ( # pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base) # ).clip(lower=0) # remaining_hours = remaining_hours.round().astype(int) # counts = remaining_hours.value_counts().sort_index() # probs = (counts / counts.sum()).round(4) # top_hours = int(probs.idxmax()) # top_prob = float(probs.max()) # dist_items = list(zip(probs.index.tolist(), probs.tolist())) # dist_items = dist_items[:10] # dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items]) dur_delta_list = df_match_chk['dur_delta'].tolist() dist_str = "'" + ' '.join([f"{ddl:g}" for ddl in dur_delta_list]) df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1 df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str df_min_hours.loc[idx, 'flag_dist'] = 'd0' pass pass # 针对历史上发生的 <升价 if not df_rise_nodes.empty: # 对准航线 航班号 行李配额 df_rise_nodes_part = df_rise_nodes[ (df_rise_nodes['citypair'] == city_pair) & (df_rise_nodes['flight_numbers'] == flight_numbers) & (df_rise_nodes['baggage_weight'] == baggage_weight) ] # 升价前 增量阈值、当前阈值 的匹配 if not df_rise_nodes_part.empty and pd.notna(price_change_percent): pct_base_1 = float(price_change_percent) pct_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_percent'], errors='coerce') df_rise_gap_1 = df_rise_nodes_part.loc[ pct_vals_1.notna(), ['rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount', 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'prev_rise_cabins', 'relative_position'] ].copy() df_rise_gap_1['pct_gap'] = (pct_vals_1.loc[pct_vals_1.notna()] - pct_base_1) df_rise_gap_1['pct_abs_gap'] = df_rise_gap_1['pct_gap'].abs() price_base_1 = pd.to_numeric(price_amount, errors='coerce') rise_price_vals_1 = pd.to_numeric(df_rise_gap_1['prev_rise_amount'], errors='coerce') df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1 df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs() df_rise_gap_1 = df_rise_gap_1.sort_values(['price_abs_gap', 'pct_abs_gap'], ascending=[True, True]) same_sign_mask_1 = ( np.sign(pd.to_numeric(df_rise_gap_1['prev_rise_change_percent'], errors='coerce')) == np.sign(pct_base_1) ) df_match_1 = df_rise_gap_1.loc[ (df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_rise_gap_1['price_abs_gap'] <= 5.0) & same_sign_mask_1 & (df_rise_gap_1['prev_rise_cabins'] == cabins) ].copy() # 历史上出现的极近似的增长(下降)幅度后的升价场景 if not df_match_1.empty: dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce') hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce') dtd_base_1 = pd.to_numeric(days_to_departure, errors='coerce') if pd.notna(dur_base_1) and pd.notna(dtd_base_1) and pd.notna(hud_base_1): df_match_chk_1 = df_match_1.copy() # drop_dtd_vals_1 = pd.to_numeric(df_match_chk_1['rise_days_to_departure'], errors='coerce') # df_match_chk_1 = df_match_chk_1.loc[drop_dtd_vals_1.notna()].copy() # df_match_chk_1 = df_match_chk_1.loc[(drop_dtd_vals_1.loc[drop_dtd_vals_1.notna()] - float(dtd_base_1)).abs() <= 3].copy() # rise_hud_vals_1 = pd.to_numeric(df_match_chk_1['rise_hours_until_departure'], errors='coerce') # df_match_chk_1 = df_match_chk_1.loc[rise_hud_vals_1.notna()].copy() # df_match_chk_1 = df_match_chk_1.loc[(float(hud_base_1) - rise_hud_vals_1.loc[rise_hud_vals_1.notna()]) >= -24].copy() # 反例收紧:48小时内发生降价的不算显著反例 _rise_pct_chk = pd.to_numeric(df_match_chk_1['rise_price_change_percent'], errors='coerce') _prev_dur_chk = pd.to_numeric(df_match_chk_1['prev_rise_duration_hours'], errors='coerce') _exclude_mask = _rise_pct_chk.lt(0) & _prev_dur_chk.lt(48) df_match_chk_1 = df_match_chk_1.loc[~_exclude_mask.fillna(False)].copy() # 所有条件都对的上 if not df_match_chk_1.empty: length_rise = df_match_chk_1.shape[0] df_min_hours.loc[idx, 'rise_price_sample_size'] = length_rise rise_price_change_upper = df_match_chk_1['rise_price_change_amount'].max() # 涨价上限 rise_price_change_lower = df_match_chk_1['rise_price_change_amount'].min() # 涨价下限 df_min_hours.loc[idx, 'rise_price_change_upper'] = round(rise_price_change_upper, 2) df_min_hours.loc[idx, 'rise_price_change_lower'] = round(rise_price_change_lower, 2) # 可以明确的判定不降价 if length_drop == 0: df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0 df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r0' df_min_hours.loc[idx, 'flag_dist'] = 'r0' # 分歧判定 else: drop_prob = round(length_drop / (length_rise + length_drop), 2) # 依旧保持之前的降价判定,概率修改 if drop_prob > 0.5: df_min_hours.loc[idx, 'simple_will_price_drop'] = 1 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1' df_min_hours.loc[idx, 'flag_dist'] = 'd1' # 改判不降价,概率修改 else: df_min_hours.loc[idx, 'simple_will_price_drop'] = 0 # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r1' df_min_hours.loc[idx, 'flag_dist'] = 'r1' df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob print("判定循环结束") _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h") df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(360, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S") df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(72, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S") # 要展示在预测表里的字段 order_cols = [ "citypair", "flight_numbers", "baggage_weight", "from_date", "from_time", "cabins", "ticket_amount", "currency", "price_base", "price_tax", "price_total", 'relative_position', 'is_good_target', 'days_to_departure', 'hours_until_departure', 'price_change_amount', 'price_change_percent', 'price_duration_hours', "update_hour", "create_time", 'valid_begin_hour', 'valid_end_hour', 'simple_will_price_drop', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist', 'flag_dist', 'drop_price_change_upper', 'drop_price_change_lower', 'drop_price_sample_size', 'rise_price_change_upper', 'rise_price_change_lower', 'rise_price_sample_size', ] df_predict = df_min_hours[order_cols] df_predict = df_predict.rename(columns={ 'simple_will_price_drop': 'will_price_drop', 'simple_drop_in_hours_prob': 'drop_in_hours_prob', 'simple_drop_in_hours_dist': 'drop_in_hours_dist', } ) # 排序 df_predict = df_predict.sort_values( by=['citypair', 'flight_numbers', 'baggage_weight', 'from_date'], kind='mergesort', na_position='last', ).reset_index(drop=True) total_cnt = len(df_predict) if "will_price_drop" in df_predict.columns: _wpd = pd.to_numeric(df_predict["will_price_drop"], errors="coerce") drop_1_cnt = int((_wpd == 1).sum()) drop_0_cnt = int((_wpd == 0).sum()) else: drop_1_cnt = 0 drop_0_cnt = 0 print(f"will_price_drop 分类数量统计: 1(会降)={drop_1_cnt}, 0(不降)={drop_0_cnt}, 总数={total_cnt}, 过滤前总数={total_cnt_before}") csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv') df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig') print("预测结果已追加") return df_predict