| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- import pandas as pd
- import numpy as np
- import gc
- import os
- def preprocess_data_simple(df_input, is_train=False, hourly_time=None):
- print(">>> 开始数据预处理")
- # 城市码映射成数字(不用)
- # 更新日期是周几
- df_input['update_week'] = df_input['update_hour'].dt.dayofweek + 1
-
- # gid:基于指定字段的分组标记(整数)
- df_input['gid'] = (
- df_input
- .groupby(
- ['citypair', 'flight_numbers', 'from_date'], # 'baggage_weight' 先不进分组
- sort=False
- )
- .ngroup()
- )
- # 在 gid 与 baggage_weight 内按时间降序
- df_input = df_input.sort_values(
- by=['gid', 'baggage_weight', 'hours_until_departure'],
- ascending=[True, True, False]
- ).reset_index(drop=True)
- df_input = df_input[df_input['hours_until_departure'] <= 480]
- df_input = df_input[df_input['baggage_weight'] == 20] # 先保留20公斤行李的
- # 在hours_until_departure 的末尾 保留到当前时刻的数据
- if not is_train:
- df_input = df_input[df_input['update_hour'] <= hourly_time].copy()
- else:
- df_input = df_input.copy() # 训练集也 copy 一下保持一致性
-
- df_input = df_input.reset_index(drop=True)
- # 价格变化最小量阈值
- price_change_amount_threshold = 5
- df_input['_raw_price_diff'] = df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].diff()
- # 计算价格变化量
- df_input['price_change_amount'] = (
- df_input['_raw_price_diff']
- .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
- .replace(0, np.nan)
- .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
- .ffill()
- .fillna(0)
- .round(2)
- )
- # 计算价格变化百分比(相对于上一时间点的变化率)
- df_input['price_change_percent'] = (
- df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total']
- .pct_change()
- .mask(df_input['_raw_price_diff'].abs() < price_change_amount_threshold, 0)
- .replace(0, np.nan)
- .groupby([df_input['gid'], df_input['baggage_weight']], group_keys=False)
- .ffill()
- .fillna(0)
- .round(4)
- )
- # 第一步:标记价格变化段
- df_input['price_change_segment'] = (
- df_input.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount']
- .apply(lambda s: (s != s.shift()).cumsum())
- )
- # 第二步:计算每个变化段内的持续时间
- df_input['price_duration_hours'] = (
- df_input.groupby(['gid', 'baggage_weight', 'price_change_segment'], group_keys=False)
- .cumcount()
- .add(1)
- )
- # 可选:删除临时列
- df_input = df_input.drop(columns=['price_change_segment', '_raw_price_diff'])
- # 训练过程
- if is_train:
- df_target = df_input[(df_input['hours_until_departure'] >= 24) & (df_input['hours_until_departure'] <= 360)].copy()
- df_target = df_target.sort_values(
- by=['gid', 'baggage_weight', 'hours_until_departure'],
- ascending=[True, True, False]
- ).reset_index(drop=True)
- # 每条对应的前一条记录
- prev_pct = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_percent'].shift(1)
- prev_amo = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_change_amount'].shift(1)
- prev_dur = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_duration_hours'].shift(1)
- prev_price = df_target.groupby(['gid', 'baggage_weight'], group_keys=False)['price_total'].shift(1)
-
- # 对于先升后降(先降再降)的分析
- seg_start_mask = df_target['price_duration_hours'].eq(1) # 开始变价节点
- drop_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] < 0)
- df_drop_nodes = df_target.loc[drop_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy()
- df_drop_nodes.rename(columns={'hours_until_departure': 'drop_hours_until_departure'}, inplace=True)
- df_drop_nodes.rename(columns={'days_to_departure': 'drop_days_to_departure'}, inplace=True)
- df_drop_nodes.rename(columns={'update_hour': 'drop_update_hour'}, inplace=True)
- df_drop_nodes.rename(columns={'update_week': 'drop_update_week'}, inplace=True)
- df_drop_nodes['drop_price_change_percent'] = df_target.loc[drop_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
- df_drop_nodes['drop_price_change_amount'] = df_target.loc[drop_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
- df_drop_nodes['high_price_duration_hours'] = prev_dur.loc[drop_mask].astype(float).to_numpy()
- df_drop_nodes['high_price_change_percent'] = prev_pct.loc[drop_mask].astype(float).round(4).to_numpy()
- df_drop_nodes['high_price_change_amount'] = prev_amo.loc[drop_mask].astype(float).round(2).to_numpy()
- df_drop_nodes['high_price_amount'] = prev_price.loc[drop_mask].astype(float).round(2).to_numpy()
- df_drop_nodes = df_drop_nodes.reset_index(drop=True)
- flight_info_cols = [
- 'citypair', 'flight_numbers', 'from_time', 'from_date', 'currency',
- ]
- flight_info_cols = [c for c in flight_info_cols if c in df_target.columns]
- df_gid_info = df_target[['gid', 'baggage_weight'] + flight_info_cols].drop_duplicates(subset=['gid', 'baggage_weight']).reset_index(drop=True)
- df_drop_nodes = df_drop_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
- drop_info_cols = [
- 'drop_update_hour', 'drop_update_week',
- 'drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
- 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount',
- ]
- # 按顺序排列 去掉gid
- df_drop_nodes = df_drop_nodes[flight_info_cols + ['baggage_weight'] + drop_info_cols]
-
- # 对于先升再升(先降再升)的分析
- # seg_start_mask = df_target['price_duration_hours'].eq(1)
- rise_mask = seg_start_mask & ((prev_pct > 0) | (prev_pct < 0)) & (df_target['price_change_percent'] > 0)
- df_rise_nodes = df_target.loc[rise_mask, ['gid', 'baggage_weight', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week']].copy()
- df_rise_nodes.rename(columns={'hours_until_departure': 'rise_hours_until_departure'}, inplace=True)
- df_rise_nodes.rename(columns={'days_to_departure': 'rise_days_to_departure'}, inplace=True)
- df_rise_nodes.rename(columns={'update_hour': 'rise_update_hour'}, inplace=True)
- df_rise_nodes.rename(columns={'update_week': 'rise_update_week'}, inplace=True)
- df_rise_nodes['rise_price_change_percent'] = df_target.loc[rise_mask, 'price_change_percent'].astype(float).round(4).to_numpy()
- df_rise_nodes['rise_price_change_amount'] = df_target.loc[rise_mask, 'price_change_amount'].astype(float).round(2).to_numpy()
- df_rise_nodes['prev_rise_duration_hours'] = prev_dur.loc[rise_mask].astype(float).to_numpy()
- df_rise_nodes['prev_rise_change_percent'] = prev_pct.loc[rise_mask].astype(float).round(4).to_numpy()
- df_rise_nodes['prev_rise_change_amount'] = prev_amo.loc[rise_mask].astype(float).round(2).to_numpy()
- df_rise_nodes['prev_rise_amount'] = prev_price.loc[rise_mask].astype(float).round(2).to_numpy()
- df_rise_nodes = df_rise_nodes.reset_index(drop=True)
- df_rise_nodes = df_rise_nodes.merge(df_gid_info, on=['gid', 'baggage_weight'], how='left')
-
- rise_info_cols = [
- 'rise_update_hour', 'rise_update_week',
- 'rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
- 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount',
- ]
- df_rise_nodes = df_rise_nodes[flight_info_cols + ['baggage_weight'] + rise_info_cols]
-
- # 制作历史包络线
- envelope_group = ['citypair', 'flight_numbers', 'from_date', 'baggage_weight']
- idx_peak = df_input.groupby(envelope_group)['price_total'].idxmax()
- df_envelope = df_input.loc[idx_peak, envelope_group + [
- 'from_time', 'price_total', 'hours_until_departure', 'days_to_departure', 'update_hour', 'update_week',
- ]].rename(columns={
- 'price_total': 'peak_price',
- 'hours_until_departure': 'peak_hours',
- 'days_to_departure': 'peak_days',
- 'update_hour': 'peak_time',
- 'update_week': 'peak_week',
- }).reset_index(drop=True)
-
- del df_gid_info
- del df_target
- return df_input, df_drop_nodes, df_rise_nodes, df_envelope
-
- return df_input, None, None, None
-
- def predict_data_simple(df_input, city_pair, output_dir, predict_dir=".", pred_time_str=""):
- if df_input is None or df_input.empty:
- return pd.DataFrame()
- df_sorted = df_input.sort_values(
- by=['gid', 'baggage_weight', 'hours_until_departure'],
- ascending=[True, True, False],
- ).reset_index(drop=True)
- df_sorted = df_sorted[
- df_sorted['hours_until_departure'].between(24, 360)
- ].reset_index(drop=True)
- # 每个 gid baggage_weight 取 hours_until_departure 最小的一条 (当前小时)
- df_min_hours = (
- df_sorted.drop_duplicates(subset=['gid', 'baggage_weight'], keep='last')
- .reset_index(drop=True)
- )
- # 读历史降价场景
- drop_info_csv_path = os.path.join(output_dir, f'{city_pair}_drop_info.csv')
- if os.path.exists(drop_info_csv_path):
- df_drop_nodes = pd.read_csv(drop_info_csv_path)
- else:
- df_drop_nodes = pd.DataFrame()
- # 读历史升价场景
- rise_info_csv_path = os.path.join(output_dir, f'{city_pair}_rise_info.csv')
- if os.path.exists(rise_info_csv_path):
- df_rise_nodes = pd.read_csv(rise_info_csv_path)
- else:
- df_rise_nodes = pd.DataFrame()
-
- # 联合价格分布
- # 统一初始化
- df_min_hours['relative_position'] = np.nan
- if not df_drop_nodes.empty:
- df_drop_nodes['relative_position'] = np.nan
- if not df_rise_nodes.empty:
- df_rise_nodes['relative_position'] = np.nan
-
- parts = []
- # 当前待预测
- if not df_min_hours.empty and 'price_total' in df_min_hours.columns:
- cur = df_min_hours[['price_total']].copy()
- cur['price'] = pd.to_numeric(cur['price_total'], errors='coerce')
- cur['source'] = 'min'
- cur['row_id'] = cur.index
- parts.append(cur[['price', 'source', 'row_id']])
-
- # 历史降价
- if not df_drop_nodes.empty and 'high_price_amount' in df_drop_nodes.columns:
- drop = df_drop_nodes[['high_price_amount']].copy()
- drop['price'] = pd.to_numeric(drop['high_price_amount'], errors='coerce')
- drop['source'] = 'drop'
- drop['row_id'] = drop.index
- parts.append(drop[['price', 'source', 'row_id']])
-
- # 历史升价
- if not df_rise_nodes.empty and 'prev_rise_amount' in df_rise_nodes.columns:
- rise = df_rise_nodes[['prev_rise_amount']].copy()
- rise['price'] = pd.to_numeric(rise['prev_rise_amount'], errors='coerce')
- rise['source'] = 'rise'
- rise['row_id'] = rise.index
- parts.append(rise[['price', 'source', 'row_id']])
-
- if parts:
- all_prices = pd.concat(parts, ignore_index=True)
- all_prices = all_prices.dropna(subset=['price']).reset_index(drop=True)
- # 计算价格百分位
- dense_rank = all_prices['price'].rank(method='dense')
- max_rank = dense_rank.max()
- if pd.notna(max_rank) and max_rank > 1:
- all_prices['relative_position'] = (dense_rank - 1) / (max_rank - 1)
- else:
- all_prices['relative_position'] = 1.0
- all_prices['relative_position'] = all_prices['relative_position'].round(4)
- # 回填到三个表
- m = all_prices['source'] == 'min'
- df_min_hours.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
- if not df_drop_nodes.empty:
- m = all_prices['source'] == 'drop'
- df_drop_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
- if not df_rise_nodes.empty:
- m = all_prices['source'] == 'rise'
- df_rise_nodes.loc[all_prices.loc[m, 'row_id'], 'relative_position'] = all_prices.loc[m, 'relative_position'].values
-
- pass
- # =====================================================================
- df_min_hours['simple_will_price_drop'] = 0
- df_min_hours['simple_drop_in_hours'] = 0
- df_min_hours['simple_drop_in_hours_prob'] = 0.0
- df_min_hours['simple_drop_in_hours_dist'] = '' # 空串 表示未知
- df_min_hours['flag_dist'] = ''
- df_min_hours['drop_price_change_upper'] = 0.0
- df_min_hours['drop_price_change_lower'] = 0.0
- df_min_hours['drop_price_sample_size'] = 0
- df_min_hours['rise_price_change_upper'] = 0.0
- df_min_hours['rise_price_change_lower'] = 0.0
- df_min_hours['rise_price_sample_size'] = 0
- # 这个阈值取多少?
- # pct_threshold = 0.01
- # pct_threshold_1 = 0.01
- for idx, row in df_min_hours.iterrows():
- city_pair = row['citypair']
- flight_numbers = row['flight_numbers']
- baggage_weight = row['baggage_weight']
- days_to_departure = row['days_to_departure']
- hours_until_departure = row['hours_until_departure']
- price_change_percent = row['price_change_percent']
- price_change_amount = row['price_change_amount']
- price_duration_hours = row['price_duration_hours']
- price_amount = row['price_total']
- length_drop = 0
- length_rise = 0
- # 针对历史上发生的 >降价
- if not df_drop_nodes.empty:
- # 对准航线 航班号 行李配额
- df_drop_nodes_part = df_drop_nodes[
- (df_drop_nodes['citypair'] == city_pair) &
- (df_drop_nodes['flight_numbers'] == flight_numbers) &
- (df_drop_nodes['baggage_weight'] == baggage_weight)
- ]
- # 降价前 增量阈值、当前阈值 的匹配
- if not df_drop_nodes_part.empty and pd.notna(price_change_amount):
-
- pca_base = float(price_change_amount)
- pca_vals = pd.to_numeric(df_drop_nodes_part['high_price_change_amount'], errors='coerce')
- df_drop_gap = df_drop_nodes_part.loc[
- pca_vals.notna(),
- ['drop_days_to_departure', 'drop_hours_until_departure', 'drop_price_change_percent', 'drop_price_change_amount',
- 'high_price_duration_hours', 'high_price_change_percent', 'high_price_change_amount', 'high_price_amount', 'relative_position'
- ]
- ].copy()
- df_drop_gap['pca_gap'] = (pca_vals.loc[pca_vals.notna()] - pca_base)
- df_drop_gap['pca_abs_gap'] = df_drop_gap['pca_gap'].abs()
- price_base = pd.to_numeric(price_amount, errors='coerce')
- high_price_vals = pd.to_numeric(df_drop_gap['high_price_amount'], errors='coerce')
- df_drop_gap['price_gap'] = high_price_vals - price_base
- df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
- df_drop_gap = df_drop_gap.sort_values(['price_abs_gap', 'pca_abs_gap'], ascending=[True, True])
- df_match = df_drop_gap[(df_drop_gap['price_abs_gap'] <= 5.0) & (df_drop_gap['pca_abs_gap'] <= 10.0)].copy()
- # 历史上出现的极近似的增长(下降)幅度后的降价场景
- if not df_match.empty:
- dur_base = pd.to_numeric(price_duration_hours, errors='coerce')
- # hud_base = pd.to_numeric(hours_until_departure, errors='coerce')
- dtd_base = pd.to_numeric(days_to_departure, errors='coerce')
- if pd.notna(dur_base) and pd.notna(dtd_base):
- df_match_chk = df_match.copy()
- drop_dtd_vals = pd.to_numeric(df_match_chk['drop_days_to_departure'], errors='coerce')
- df_match_chk = df_match_chk.loc[drop_dtd_vals.notna()].copy()
- df_match_chk = df_match_chk.loc[(drop_dtd_vals.loc[drop_dtd_vals.notna()] - float(dtd_base)).abs() <= 3].copy()
- # 距离起飞天数也对的上
- if not df_match_chk.empty:
- length_drop = df_match_chk.shape[0]
- df_min_hours.loc[idx, 'drop_price_sample_size'] = length_drop
- drop_price_change_upper = df_match_chk['drop_price_change_amount'].max() # 降价上限
- drop_price_change_lower = df_match_chk['drop_price_change_amount'].min() # 降价下限
- df_min_hours.loc[idx, 'drop_price_change_upper'] = round(drop_price_change_upper, 2)
- df_min_hours.loc[idx, 'drop_price_change_lower'] = round(drop_price_change_lower, 2)
- remaining_hours = (
- pd.to_numeric(df_match_chk['high_price_duration_hours'], errors='coerce') - float(dur_base)
- ).clip(lower=0)
- remaining_hours = remaining_hours.round().astype(int)
- counts = remaining_hours.value_counts().sort_index()
- probs = (counts / counts.sum()).round(4)
- top_hours = int(probs.idxmax())
- top_prob = float(probs.max())
- dist_items = list(zip(probs.index.tolist(), probs.tolist()))
- dist_items = dist_items[:10]
- dist_str = ' '.join([f"{int(h)}h->{float(p)}" for h, p in dist_items])
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- df_min_hours.loc[idx, 'simple_drop_in_hours'] = top_hours
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1
- df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = dist_str
- df_min_hours.loc[idx, 'flag_dist'] = 'd0'
- pass
- pass
-
- # 针对历史上发生的 <升价
- if not df_rise_nodes.empty:
- # 对准航线 航班号 行李配额
- df_rise_nodes_part = df_rise_nodes[
- (df_rise_nodes['citypair'] == city_pair) &
- (df_rise_nodes['flight_numbers'] == flight_numbers) &
- (df_rise_nodes['baggage_weight'] == baggage_weight)
- ]
- # 升价前 增量阈值、当前阈值 的匹配
- if not df_rise_nodes_part.empty and pd.notna(price_change_amount):
- pca_base_1 = float(price_change_amount)
- pca_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_amount'], errors='coerce')
- df_rise_gap_1 = df_rise_nodes_part.loc[
- pca_vals_1.notna(),
- ['rise_days_to_departure', 'rise_hours_until_departure', 'rise_price_change_percent', 'rise_price_change_amount',
- 'prev_rise_duration_hours', 'prev_rise_change_percent', 'prev_rise_change_amount', 'prev_rise_amount', 'relative_position']
- ].copy()
- df_rise_gap_1['pca_gap'] = (pca_vals_1.loc[pca_vals_1.notna()] - pca_base_1)
- df_rise_gap_1['pca_abs_gap'] = df_rise_gap_1['pca_gap'].abs()
- price_base_1 = pd.to_numeric(price_amount, errors='coerce')
- rise_price_vals_1 = pd.to_numeric(df_rise_gap_1['prev_rise_amount'], errors='coerce')
- df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1
- df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs()
- df_rise_gap_1 = df_rise_gap_1.sort_values(['price_abs_gap', 'pca_abs_gap'], ascending=[True, True])
- df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['price_abs_gap'] <= 5.0) & (df_rise_gap_1['pca_abs_gap'] <= 10.0)].copy()
- # 历史上出现的极近似的增长(下降)幅度后的升价场景
- if not df_match_1.empty:
- dur_base_1 = pd.to_numeric(price_duration_hours, errors='coerce')
- # hud_base_1 = pd.to_numeric(hours_until_departure, errors='coerce')
- dtd_base_1 = pd.to_numeric(days_to_departure, errors='coerce')
- if pd.notna(dur_base_1) and pd.notna(dtd_base_1):
- df_match_chk_1 = df_match_1.copy()
-
- drop_dtd_vals_1 = pd.to_numeric(df_match_chk_1['rise_days_to_departure'], errors='coerce')
- df_match_chk_1 = df_match_chk_1.loc[drop_dtd_vals_1.notna()].copy()
- df_match_chk_1 = df_match_chk_1.loc[(drop_dtd_vals_1.loc[drop_dtd_vals_1.notna()] - float(dtd_base_1)).abs() <= 3].copy()
- # 距离起飞天数也对的上
- if not df_match_chk_1.empty:
- length_rise = df_match_chk_1.shape[0]
- df_min_hours.loc[idx, 'rise_price_sample_size'] = length_rise
- rise_price_change_upper = df_match_chk_1['rise_price_change_amount'].max() # 涨价上限
- rise_price_change_lower = df_match_chk_1['rise_price_change_amount'].min() # 涨价下限
- df_min_hours.loc[idx, 'rise_price_change_upper'] = round(rise_price_change_upper, 2)
- df_min_hours.loc[idx, 'rise_price_change_lower'] = round(rise_price_change_lower, 2)
- # 可以明确的判定不降价
- if length_drop == 0:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r0'
- df_min_hours.loc[idx, 'flag_dist'] = 'r0'
- # 分歧判定
- else:
- drop_prob = round(length_drop / (length_rise + length_drop), 2)
- # 依旧保持之前的降价判定,概率修改
- if drop_prob >= 0.4:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'd1'
- df_min_hours.loc[idx, 'flag_dist'] = 'd1'
- # 改判不降价,概率修改
- else:
- df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
- # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'r1'
- df_min_hours.loc[idx, 'flag_dist'] = 'r1'
- df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob
- print("判定循环结束")
- _dep_hour = pd.to_datetime(df_min_hours["from_time"], errors="coerce").dt.floor("h")
- df_min_hours["valid_begin_hour"] = (_dep_hour - pd.to_timedelta(360, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
- df_min_hours["valid_end_hour"] = (_dep_hour - pd.to_timedelta(24, unit="h")).dt.strftime("%Y-%m-%d %H:%M:%S")
- # 要展示在预测表里的字段
- order_cols = [
- "citypair", "flight_numbers", "baggage_weight", "from_date", "from_time",
- "cabins", "ticket_amount", "currency",
- "price_total", 'relative_position', 'days_to_departure', 'hours_until_departure',
- 'price_change_amount', 'price_change_percent', 'price_duration_hours',
- "update_hour", "update_week",
- 'valid_begin_hour', 'valid_end_hour',
- 'simple_will_price_drop', 'simple_drop_in_hours', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist',
- 'flag_dist',
- 'drop_price_change_upper', 'drop_price_change_lower', 'drop_price_sample_size',
- 'rise_price_change_upper', 'rise_price_change_lower', 'rise_price_sample_size',
- ]
- df_predict = df_min_hours[order_cols]
- df_predict = df_predict.rename(columns={
- 'simple_will_price_drop': 'will_price_drop',
- 'simple_drop_in_hours': 'drop_in_hours',
- 'simple_drop_in_hours_prob': 'drop_in_hours_prob',
- 'simple_drop_in_hours_dist': 'drop_in_hours_dist',
- }
- )
- # 排序
- df_predict = df_predict.sort_values(
- by=['citypair', 'flight_numbers', 'baggage_weight', 'from_date'],
- kind='mergesort',
- na_position='last',
- ).reset_index(drop=True)
- total_cnt = len(df_predict)
- if "will_price_drop" in df_predict.columns:
- _wpd = pd.to_numeric(df_predict["will_price_drop"], errors="coerce")
- drop_1_cnt = int((_wpd == 1).sum())
- drop_0_cnt = int((_wpd == 0).sum())
- else:
- drop_1_cnt = 0
- drop_0_cnt = 0
- print(f"will_price_drop 分类数量统计: 1(会降)={drop_1_cnt}, 0(不降)={drop_0_cnt}, 总数={total_cnt}")
- csv_path1 = os.path.join(predict_dir, f'future_predictions_{pred_time_str}.csv')
- df_predict.to_csv(csv_path1, mode='a', index=False, header=not os.path.exists(csv_path1), encoding='utf-8-sig')
- print("预测结果已追加")
- return df_predict
|