Просмотр исходного кода

提交近期修改 特征提取

node04 3 дней назад
Родитель
Сommit
47b17fdfa6
4 измененных файлов с 299 добавлено и 82 удалено
  1. 8 2
      config.py
  2. 8 1
      data_loader.py
  3. 256 65
      data_preprocess.py
  4. 27 14
      main_tr.py

+ 8 - 2
config.py

@@ -57,6 +57,10 @@ city_to_country = {
     "AMD": "IN",  # 艾哈迈达巴德,印度
     "AMD": "IN",  # 艾哈迈达巴德,印度
 }
 }
 
 
+# 城市码-数字的映射
+vj_city_code_map = {k: i for i, k in enumerate(city_to_country.keys())}
+
+
 # 生成各个国家(地区)的节假日
 # 生成各个国家(地区)的节假日
 def build_country_holidays(city_to_country):
 def build_country_holidays(city_to_country):
     countries = sorted(set(city_to_country.values()))
     countries = sorted(set(city_to_country.values()))
@@ -142,5 +146,7 @@ if __name__ == '__main__':
     # else:
     # else:
     #     print("没有发现重复航线")
     #     print("没有发现重复航线")
 
 
-    COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
-    print(COUNTRY_HOLIDAYS)
+    # COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
+    # print(COUNTRY_HOLIDAYS)
+    
+    print(vj_city_code_map)

+ 8 - 1
data_loader.py

@@ -334,6 +334,9 @@ def fill_hourly_crawl_date(df, head_fill=0, rear_fill=0):
                 (df['seg1_dep_hour'] - df.index) / pd.Timedelta(hours=1)
                 (df['seg1_dep_hour'] - df.index) / pd.Timedelta(hours=1)
         ).astype('int64')
         ).astype('int64')
 
 
+        # 新增:距离起飞还有多少天
+        df['days_to_departure'] = (df['hours_until_departure'] // 24).astype('int64')
+
         # 删除临时字段
         # 删除临时字段
         df = df.drop(columns=['seg1_dep_hour'])
         df = df.drop(columns=['seg1_dep_hour'])
 
 
@@ -703,7 +706,11 @@ def load_train_data(db, flight_route_list, table_name, date_begin, date_end, out
 
 
         print(f"结束处理航线: {from_city}-{to_city}")
         print(f"结束处理航线: {from_city}-{to_city}")
 
 
-    df_all = pd.concat(list_all, ignore_index=True)
+    if list_all:
+        df_all = pd.concat(list_all, ignore_index=True)
+    else:
+        df_all = pd.DataFrame()
+
     print(f"本批次数据加载完毕, 总形状: {df_all.shape}")
     print(f"本批次数据加载完毕, 总形状: {df_all.shape}")
     del list_all
     del list_all
     gc.collect()
     gc.collect()

+ 256 - 65
data_preprocess.py

@@ -1,39 +1,43 @@
 import pandas as pd
 import pandas as pd
 import numpy as np
 import numpy as np
 import bisect
 import bisect
+import gc
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from sklearn.preprocessing import StandardScaler
 from sklearn.preprocessing import StandardScaler
-from config import city_to_country, build_country_holidays
+from config import city_to_country, vj_city_code_map, build_country_holidays
 
 
 COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
 COUNTRY_HOLIDAYS = build_country_holidays(city_to_country)
 
 
 
 
-def preprocess_data(df_train, features, categorical_features, is_training=True):
+def preprocess_data(df_input, features, categorical_features, is_training=True, current_n_hours=28):
     print(">>> 开始数据预处理") 
     print(">>> 开始数据预处理") 
 
 
     # 生成 城市对
     # 生成 城市对
-    df_train['city_pair'] = (
-        df_train['from_city_code'].astype(str) + "-" + df_train['to_city_code'].astype(str)
+    df_input['city_pair'] = (
+        df_input['from_city_code'].astype(str) + "-" + df_input['to_city_code'].astype(str)
     )
     )
-    # 把 city_pair、from_city_code、to_city_code 放到前三列
-    cols = df_train.columns.tolist()
-    # 删除已存在的三列(保证顺序正确)
-    for c in ['city_pair', 'from_city_code', 'to_city_code']:
+    df_input['from_city_num'] = df_input['from_city_code'].map(vj_city_code_map)
+    df_input['to_city_num'] = df_input['to_city_code'].map(vj_city_code_map)
+
+    # 把 city_pair、from_city_code、from_city_num, to_city_code, to_city_num 放到前几列
+    cols = df_input.columns.tolist()
+    # 删除已存在的几列(保证顺序正确)
+    for c in ['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num']:
         cols.remove(c)
         cols.remove(c)
-    # 这三列插入到最前面
-    df_train = df_train[['city_pair', 'from_city_code', 'to_city_code'] + cols]
+    # 这列插入到最前面
+    df_input = df_input[['city_pair', 'from_city_code', 'from_city_num', 'to_city_code', 'to_city_num'] + cols]
 
 
     # 转格式
     # 转格式
-    df_train['search_dep_time'] = pd.to_datetime(
-        df_train['search_dep_time'],
+    df_input['search_dep_time'] = pd.to_datetime(
+        df_input['search_dep_time'],
         format='%Y%m%d',
         format='%Y%m%d',
         errors='coerce'
         errors='coerce'
     ).dt.strftime('%Y-%m-%d')
     ).dt.strftime('%Y-%m-%d')
     # 重命名起飞日期
     # 重命名起飞日期
-    df_train.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
+    df_input.rename(columns={'search_dep_time': 'flight_day'}, inplace=True)
     
     
     # 重命名航班号
     # 重命名航班号
-    df_train.rename(
+    df_input.rename(
         columns={
         columns={
             'seg1_flight_number': 'flight_number_1',
             'seg1_flight_number': 'flight_number_1',
             'seg2_flight_number': 'flight_number_2'
             'seg2_flight_number': 'flight_number_2'
@@ -41,42 +45,103 @@ def preprocess_data(df_train, features, categorical_features, is_training=True):
         inplace=True
         inplace=True
     )
     )
     # 分开填充
     # 分开填充
-    df_train['flight_number_1'] = df_train['flight_number_1'].fillna('VJ')
-    df_train['flight_number_2'] = df_train['flight_number_2'].fillna('VJ')
+    df_input['flight_number_1'] = df_input['flight_number_1'].fillna('VJ')
+    df_input['flight_number_2'] = df_input['flight_number_2'].fillna('VJ')
+
+    # gid:基于指定字段的分组标记(整数)
+    df_input['gid'] = (
+        df_input
+        .groupby(
+            ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2'],    # 'baggage' 先不进分组
+            sort=False
+        )
+        .ngroup()
+    )
+
+    # 做一下时间段裁剪, 保留起飞前480小时之内的
+    df_input = df_input[df_input['hours_until_departure'] < 480].reset_index(drop=True)
+    pass
+
+    # 在 gid 与 baggage 内按时间降序
+    df_input = df_input.sort_values(
+        by=['gid', 'baggage', 'hours_until_departure'],
+        ascending=[True, True, False]
+    ).reset_index(drop=True)
+
+    # 价格变化掩码
+    g = df_input.groupby(['gid', 'baggage'])
+    diff = g['adult_total_price'].transform('diff')
+    change_mask = diff.abs() >= 5   # 变化太小的不计入
+
+    # 价格变化次数
+    df_input['price_change_times_total'] = (
+        change_mask.groupby([df_input['gid'], df_input['baggage']]).cumsum()
+    )
+
+    # 上次发生变价的小时数
+    last_change_hour = (
+        df_input['hours_until_departure']
+        .where(change_mask)
+        .groupby([df_input['gid'], df_input['baggage']])
+        .ffill()  # 前向填充
+    )
+
+    # 当前距离上一次变价过去多少小时
+    df_input['price_last_change_hours'] = (
+        last_change_hour - df_input['hours_until_departure']
+    ).fillna(0)
+    pass
+
+    # 想插入到 seats_remaining 前面的新列
+    new_cols = [
+        'price_change_times_total',
+        'price_last_change_hours'
+    ]
+    # 当前所有列
+    cols = df_input.columns.tolist()
+    # 找到 seats_remaining 的位置
+    idx = cols.index('seats_remaining')
+    # 重新拼列顺序
+    new_order = cols[:idx] + new_cols + cols[idx:]
+    # 去重(防止列已经在原位置)
+    new_order = list(dict.fromkeys(new_order))
+    # 重新排列 DataFrame
+    df_input = df_input[new_order]
+    pass
 
 
     # 生成第一机场对
     # 生成第一机场对
-    df_train['airport_pair_1'] = (
-        df_train['seg1_dep_air_port'].astype(str) + "-" + df_train['seg1_arr_air_port'].astype(str)
+    df_input['airport_pair_1'] = (
+        df_input['seg1_dep_air_port'].astype(str) + "-" + df_input['seg1_arr_air_port'].astype(str)
     )
     )
     # 删除原始第一机场码
     # 删除原始第一机场码
-    df_train.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
+    df_input.drop(columns=['seg1_dep_air_port', 'seg1_arr_air_port'], inplace=True)
     # 第一机场对 放到 seg1_dep_time 列的前面
     # 第一机场对 放到 seg1_dep_time 列的前面
-    insert_idx = df_train.columns.get_loc('seg1_dep_time')
-    airport_pair_1 = df_train.pop('airport_pair_1')
-    df_train.insert(insert_idx, 'airport_pair_1', airport_pair_1)
+    insert_idx = df_input.columns.get_loc('seg1_dep_time')
+    airport_pair_1 = df_input.pop('airport_pair_1')
+    df_input.insert(insert_idx, 'airport_pair_1', airport_pair_1)
 
 
     # 生成第二机场对(带缺失兜底)
     # 生成第二机场对(带缺失兜底)
-    df_train['airport_pair_2'] = np.where(
-        df_train['seg2_dep_air_port'].isna() | df_train['seg2_arr_air_port'].isna(),
+    df_input['airport_pair_2'] = np.where(
+        df_input['seg2_dep_air_port'].isna() | df_input['seg2_arr_air_port'].isna(),
         'NA',
         'NA',
-        df_train['seg2_dep_air_port'].astype(str) + "-" +
-        df_train['seg2_arr_air_port'].astype(str)
+        df_input['seg2_dep_air_port'].astype(str) + "-" +
+        df_input['seg2_arr_air_port'].astype(str)
     )
     )
     # 删除原始第二机场码
     # 删除原始第二机场码
-    df_train.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
+    df_input.drop(columns=['seg2_dep_air_port', 'seg2_arr_air_port'], inplace=True)
     # 第二机场对 放到 seg2_dep_time 列的前面
     # 第二机场对 放到 seg2_dep_time 列的前面
-    insert_idx = df_train.columns.get_loc('seg2_dep_time')
-    airport_pair_2 = df_train.pop('airport_pair_2')
-    df_train.insert(insert_idx, 'airport_pair_2', airport_pair_2)
+    insert_idx = df_input.columns.get_loc('seg2_dep_time')
+    airport_pair_2 = df_input.pop('airport_pair_2')
+    df_input.insert(insert_idx, 'airport_pair_2', airport_pair_2)
     
     
     # 是否转乘
     # 是否转乘
-    df_train['is_transfer'] = np.where(df_train['flight_number_2'] == 'VJ', 0, 1)
-    insert_idx = df_train.columns.get_loc('flight_number_2')
-    is_transfer = df_train.pop('is_transfer')
-    df_train.insert(insert_idx, 'is_transfer', is_transfer)
+    df_input['is_transfer'] = np.where(df_input['flight_number_2'] == 'VJ', 0, 1)
+    insert_idx = df_input.columns.get_loc('flight_number_2')
+    is_transfer = df_input.pop('is_transfer')
+    df_input.insert(insert_idx, 'is_transfer', is_transfer)
 
 
     # 重命名起飞时刻与到达时刻
     # 重命名起飞时刻与到达时刻
-    df_train.rename(
+    df_input.rename(
         columns={
         columns={
             'seg1_dep_time': 'dep_time_1',
             'seg1_dep_time': 'dep_time_1',
             'seg1_arr_time': 'arr_time_1',
             'seg1_arr_time': 'arr_time_1',
@@ -87,34 +152,34 @@ def preprocess_data(df_train, features, categorical_features, is_training=True):
     )
     )
     
     
     # 第一段飞行时长
     # 第一段飞行时长
-    df_train['fly_duration_1'] = (
-        (df_train['arr_time_1'] - df_train['dep_time_1'])
+    df_input['fly_duration_1'] = (
+        (df_input['arr_time_1'] - df_input['dep_time_1'])
         .dt.total_seconds() / 3600
         .dt.total_seconds() / 3600
     ).round(2)
     ).round(2)
 
 
     # 第二段飞行时长(无转乘为 0)
     # 第二段飞行时长(无转乘为 0)
-    df_train['fly_duration_2'] = (
-        (df_train['arr_time_2'] - df_train['dep_time_2'])
+    df_input['fly_duration_2'] = (
+        (df_input['arr_time_2'] - df_input['dep_time_2'])
         .dt.total_seconds() / 3600
         .dt.total_seconds() / 3600
     ).fillna(0).round(2)
     ).fillna(0).round(2)
 
 
     # 总飞行时长
     # 总飞行时长
-    df_train['fly_duration'] = (
-        df_train['fly_duration_1'] + df_train['fly_duration_2']
+    df_input['fly_duration'] = (
+        df_input['fly_duration_1'] + df_input['fly_duration_2']
     ).round(2)
     ).round(2)
 
 
     # 中转停留时长(无转乘为 0)
     # 中转停留时长(无转乘为 0)
-    df_train['stop_duration'] = (
-        (df_train['dep_time_2'] - df_train['arr_time_1'])
+    df_input['stop_duration'] = (
+        (df_input['dep_time_2'] - df_input['arr_time_1'])
         .dt.total_seconds() / 3600
         .dt.total_seconds() / 3600
     ).fillna(0).round(2)
     ).fillna(0).round(2)
 
 
     # 裁剪,防止负数
     # 裁剪,防止负数
     # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
     # for c in ['fly_duration_1', 'fly_duration_2', 'fly_duration', 'stop_duration']:
-    #     df_train[c] = df_train[c].clip(lower=0)
+    #     df_input[c] = df_input[c].clip(lower=0)
 
 
     # 和 is_transfer 逻辑保持一致
     # 和 is_transfer 逻辑保持一致
-    # df_train.loc[df_train['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
+    # df_input.loc[df_input['is_transfer'] == 0, ['fly_duration_2', 'stop_duration']] = 0
     
     
     # 一次性插到 is_filled 前面
     # 一次性插到 is_filled 前面
     insert_before = 'is_filled'
     insert_before = 'is_filled'
@@ -124,62 +189,188 @@ def preprocess_data(df_train, features, categorical_features, is_training=True):
         'fly_duration',
         'fly_duration',
         'stop_duration'
         'stop_duration'
     ]
     ]
-    cols = df_train.columns.tolist()
+    cols = df_input.columns.tolist()
     idx = cols.index(insert_before)
     idx = cols.index(insert_before)
     # 删除旧位置
     # 删除旧位置
     cols = [c for c in cols if c not in new_cols]
     cols = [c for c in cols if c not in new_cols]
     # 插入新位置(顺序保持)
     # 插入新位置(顺序保持)
     cols[idx:idx] = new_cols    # python独有空切片插入法
     cols[idx:idx] = new_cols    # python独有空切片插入法
-    df_train = df_train[cols]
+    df_input = df_input[cols]
 
 
     # 一次生成多个字段
     # 一次生成多个字段
-    dep_t1 = df_train['dep_time_1']
+    dep_t1 = df_input['dep_time_1']
     # 几点起飞(0–23)
     # 几点起飞(0–23)
-    df_train['flight_by_hour'] = dep_t1.dt.hour
+    df_input['flight_by_hour'] = dep_t1.dt.hour
     # 起飞日期几号(1–31)
     # 起飞日期几号(1–31)
-    df_train['flight_by_day'] = dep_t1.dt.day
+    df_input['flight_by_day'] = dep_t1.dt.day
     # 起飞日期几月(1–12)
     # 起飞日期几月(1–12)
-    df_train['flight_day_of_month'] = dep_t1.dt.month
+    df_input['flight_day_of_month'] = dep_t1.dt.month
     # 起飞日期周几(0=周一, 6=周日)
     # 起飞日期周几(0=周一, 6=周日)
-    df_train['flight_day_of_week'] = dep_t1.dt.weekday
+    df_input['flight_day_of_week'] = dep_t1.dt.weekday
     # 起飞日期季度(1–4)
     # 起飞日期季度(1–4)
-    df_train['flight_day_of_quarter'] = dep_t1.dt.quarter
+    df_input['flight_day_of_quarter'] = dep_t1.dt.quarter
     # 是否周末(周六 / 周日)
     # 是否周末(周六 / 周日)
-    df_train['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
+    df_input['flight_day_is_weekend'] = dep_t1.dt.weekday.isin([5, 6]).astype(int)
 
 
     # 找到对应的国家码
     # 找到对应的国家码
-    df_train['dep_country'] = df_train['from_city_code'].map(city_to_country)
-    df_train['arr_country'] = df_train['to_city_code'].map(city_to_country) 
+    df_input['dep_country'] = df_input['from_city_code'].map(city_to_country)
+    df_input['arr_country'] = df_input['to_city_code'].map(city_to_country) 
 
 
     # 整体出发时间 就是 dep_time_1
     # 整体出发时间 就是 dep_time_1
-    df_train['global_dep_time'] = df_train['dep_time_1']
+    df_input['global_dep_time'] = df_input['dep_time_1']
     # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
     # 整体到达时间:有转乘用 arr_time_2,否则用 arr_time_1
-    df_train['global_arr_time'] = df_train['arr_time_2'].fillna(df_train['arr_time_1'])
+    df_input['global_arr_time'] = df_input['arr_time_2'].fillna(df_input['arr_time_1'])
 
 
     # 出发日期在出发国家是否节假日
     # 出发日期在出发国家是否节假日
-    df_train['dep_country_is_holiday'] = df_train.apply(
+    df_input['dep_country_is_holiday'] = df_input.apply(
         lambda r: r['global_dep_time'].date()
         lambda r: r['global_dep_time'].date()
         in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
         in COUNTRY_HOLIDAYS.get(r['dep_country'], set()),
         axis=1
         axis=1
     ).astype(int)
     ).astype(int)
 
 
     # 到达日期在到达国家是否节假日
     # 到达日期在到达国家是否节假日
-    df_train['arr_country_is_holiday'] = df_train.apply(
+    df_input['arr_country_is_holiday'] = df_input.apply(
         lambda r: r['global_arr_time'].date()
         lambda r: r['global_arr_time'].date()
         in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
         in COUNTRY_HOLIDAYS.get(r['arr_country'], set()),
         axis=1
         axis=1
     ).astype(int)
     ).astype(int)
 
 
     # 在任一侧是否节假日
     # 在任一侧是否节假日
-    df_train['flight_day_is_holiday'] = (
-        df_train[['dep_country_is_holiday', 'arr_country_is_holiday']]
+    df_input['flight_day_is_holiday'] = (
+        df_input[['dep_country_is_holiday', 'arr_country_is_holiday']]
         .max(axis=1)
         .max(axis=1)
     )
     )
 
 
     # 是否跨国航线
     # 是否跨国航线
-    df_train['is_cross_country'] = (
-        df_train['dep_country'] != df_train['arr_country']
+    df_input['is_cross_country'] = (
+        df_input['dep_country'] != df_input['arr_country']
     ).astype(int)
     ).astype(int)
 
 
-    pass
+    def days_to_next_holiday(country, cur_date):
+        if pd.isna(country) or pd.isna(cur_date):
+            return np.nan
+
+        holidays = COUNTRY_HOLIDAYS.get(country)
+        if not holidays:
+            return np.nan
+
+        # 找未来(含当天)的节假日,并排序
+        future_holidays = sorted([d for d in holidays if d >= cur_date])
+        if not future_holidays:
+            return np.nan
+
+        next_holiday = future_holidays[0]  # 第一个未来节假日
+        delta_days = (next_holiday - cur_date).days
+        return delta_days
 
 
+    df_input['days_to_holiday'] = df_input.apply(
+        lambda r: days_to_next_holiday(
+            r['dep_country'],
+            r['update_hour'].date()
+        ),
+        axis=1
+    )
+    
+    # 没有未来节假日的统一兜底
+    # df_input['days_to_holiday'] = df_input['days_to_holiday'].fillna(999)
+
+    # days_to_holiday 插在 update_hour 前面
+    insert_idx = df_input.columns.get_loc('update_hour')
+    days_to_holiday = df_input.pop('days_to_holiday')
+    df_input.insert(insert_idx, 'days_to_holiday', days_to_holiday)
+
+    # 制作targets
+    print(f"\n>>> 开始处理 对应区间: n_hours = {current_n_hours}")
+    target_lower_limit = 4
+    target_upper_limit = current_n_hours
+    mask_targets = (df_input['hours_until_departure'] >= target_lower_limit) & (df_input['hours_until_departure'] < target_upper_limit) & (df_input['baggage'] == 30)
+    df_targets = df_input.loc[mask_targets].copy()
+
+    targets_amout = df_targets.shape[0]
+    print(f"当前 目标区间数据量: {targets_amout}, 区间: [{target_lower_limit}, {target_upper_limit})")
+
+    if targets_amout == 0:
+        print(f">>> n_hours = {current_n_hours} 无有效数据,跳过")
+        return pd.DataFrame()
+
+    print(">>> 计算 price_at_n_hours")
+    df_input_object = df_input[(df_input['hours_until_departure'] >= current_n_hours) & (df_input['baggage'] == 30)].copy()
+    df_last = df_input_object.groupby('gid', observed=True).last().reset_index()   # 一般落在起飞前28小时
+    
+    # 提取并重命名 price 列
+    df_last_price_at_n_hours = df_last[['gid', 'adult_total_price']].rename(columns={'adult_total_price': 'price_at_n_hours'})
+    print(">>> price_at_n_hours计算完成,示例:")
+    print(df_last_price_at_n_hours.head(5))
+    
+    # 计算降价信息
+    print(">>> 计算降价信息")
+    df_targets = df_targets.merge(df_last_price_at_n_hours, on='gid', how='left')
+    df_targets['price_drop_amount'] = df_targets['price_at_n_hours'] - df_targets['adult_total_price']
+    df_targets['price_dropped'] = (
+        (df_targets['adult_total_price'] < df_targets['price_at_n_hours']) &
+        (df_targets['price_drop_amount'] >= 5)  # 降幅不能太小
+    )
+    df_price_drops = df_targets[df_targets['price_dropped']].copy()
+
+    price_drops_len = df_price_drops.shape[0]
+    if price_drops_len == 0:
+        print(f">>> n_hours = {current_n_hours} 无降价信息")
+        # 创建包含指定列的空 DataFrame
+        df_price_drop_info = pd.DataFrame({
+            'gid': pd.Series(dtype='int64'),
+            'first_drop_hours_until_departure': pd.Series(dtype='int64'),
+            'price_at_first_drop_hours': pd.Series(dtype='float64')
+        })
+    else:
+        df_price_drop_info = df_price_drops.groupby('gid', observed=True).first().reset_index()   # 第一次发生的降价
+        df_price_drop_info = df_price_drop_info[['gid', 'hours_until_departure', 'adult_total_price']].rename(columns={
+                'hours_until_departure': 'first_drop_hours_until_departure',
+                'adult_total_price': 'price_at_first_drop_hours'
+        })
+        print(">>> 降价信息计算完成,示例:")
+        print(df_price_drop_info.head(5))
+    
+    # 合并信息
+    df_gid_info = df_last_price_at_n_hours.merge(df_price_drop_info, on='gid', how='left')
+    df_gid_info['will_price_drop'] = df_gid_info['price_at_first_drop_hours'].notnull().astype(int)
+    df_gid_info['amount_of_price_drop'] = df_gid_info['price_at_n_hours'] - df_gid_info['price_at_first_drop_hours']
+    df_gid_info['amount_of_price_drop'] = df_gid_info['amount_of_price_drop'].fillna(0)  # 区别    
+    df_gid_info['time_to_price_drop'] = df_gid_info['first_drop_hours_until_departure']
+    df_gid_info['time_to_price_drop'] = df_gid_info['time_to_price_drop'].fillna(0)  # 区别
+
+    del df_input_object
+    del df_last
+    del df_last_price_at_n_hours
+    del df_targets
+    del df_price_drops
+    del df_price_drop_info
+    gc.collect()
+    
+    # 将目标变量合并到输入数据中
+    print(">>> 将目标变量信息合并到 df_input")
+    df_input = df_input.merge(df_gid_info[['gid', 'will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']], on='gid', how='left')
+    # 使用 0 填充 NaN 值
+    df_input[['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']] = df_input[
+        ['will_price_drop', 'amount_of_price_drop', 'time_to_price_drop']].fillna(0)
+    df_input = df_input.rename(columns={
+        'will_price_drop': 'target_will_price_drop',
+        'amount_of_price_drop': 'target_amount_of_drop',
+        'time_to_price_drop': 'target_time_to_drop'
+    })
+    
+    # 计算每个 gid 分组在 df_targets 中的 adult_total_price 最小值
+    # print(">>> 计算每个 gid 分组的 adult_total_price 最小值...")
+    # df_min_price_by_gid = df_targets.groupby('gid')['adult_total_price'].min().reset_index()
+    # df_min_price_by_gid = df_min_price_by_gid.rename(columns={'adult_total_price': 'min_price'})
+    # gid_count = df_min_price_by_gid.shape[0]
+    # print(f">>> 计算完成,共 {gid_count} 个 gid 分组")
+
+    # # 将最小价格 merge 到 df_inputs 中
+    # print(">>> 将最小价格 merge 到输入数据中...")
+    # df_input = df_input.merge(df_min_price_by_gid, on='gid', how='left')
+
+    print(">>> 合并后 df_input 样例:")
+    print(df_input[['gid', 'hours_until_departure', 'adult_total_price', 'target_will_price_drop', 'target_amount_of_drop', 'target_time_to_drop']].head(5))
+
+    
+    return df_input

+ 27 - 14
main_tr.py

@@ -28,11 +28,17 @@ else:
 
 
 
 
 # 定义特征和参数
 # 定义特征和参数
-categorical_features = ['city_pair', 'flight_number_1', 'flight_number_2']
-other_features = []
-features = []
+categorical_features = ['city_pair', 'flight_day', 'flight_number_1', 'flight_number_2']   # 这个与gid的分组条件一致
+common_features = ['hours_until_departure', 'days_to_departure', 'seats_remaining', 'is_cross_country', 'is_transfer', 
+                   'fly_duration', 'stop_duration', 
+                   'flight_by_hour', 'flight_by_day', 'flight_day_of_month', 'flight_day_of_week', 'flight_day_of_quarter', 'flight_day_is_weekend',
+                   'dep_country_is_holiday', 'arr_country_is_holiday', 'flight_day_is_holiday', 'days_to_holiday',
+                  ]
+price_features = ['adult_total_price', 'price_change_times_total', 'price_last_change_hours']
+encoded_columns = ['from_city_num', 'to_city_num', 'flight_1_num', 'flight_2_num', 'cabin_level', 'baggage_level']
+features = encoded_columns + price_features + common_features
+target_vars = ['target_will_price_drop']   # 是否降价
 
 
-target_vars = ['target_min_to_price']   # 最低会降到的价格
 
 
 # 分布式环境初始化
 # 分布式环境初始化
 def init_distributed_backend():
 def init_distributed_backend():
@@ -88,7 +94,7 @@ def start_train():
     photo_dir = "./photo"
     photo_dir = "./photo"
 
 
     date_end = datetime.today().strftime("%Y-%m-%d")
     date_end = datetime.today().strftime("%Y-%m-%d")
-    date_begin = (datetime.today() - timedelta(days=10)).strftime("%Y-%m-%d")
+    date_begin = (datetime.today() - timedelta(days=15)).strftime("%Y-%m-%d")
 
 
     # 仅在 rank == 0 时要做的
     # 仅在 rank == 0 时要做的
     if rank == 0:
     if rank == 0:
@@ -129,17 +135,17 @@ def start_train():
     batch_idx = -1
     batch_idx = -1
 
 
     # 主干代码
     # 主干代码
-    flight_route_list = vj_flight_route_list_hot + vj_flight_route_list_nothot
-    flight_route_list_len = len(flight_route_list)
-    route_len_hot = len(vj_flight_route_list_hot)
-    route_len_nothot = len(vj_flight_route_list_nothot)
+    # flight_route_list = vj_flight_route_list_hot + vj_flight_route_list_nothot
+    # flight_route_list_len = len(flight_route_list)
+    # route_len_hot = len(vj_flight_route_list_hot)
+    # route_len_nothot = len(vj_flight_route_list_nothot)
 
 
     # 调试代码
     # 调试代码
-    # s = 38   # 菲律宾2025-12-08是节假日 s=38 选到马尼拉 
-    # flight_route_list = vj_flight_route_list_hot[:0] + vj_flight_route_list_nothot[s:]
-    # flight_route_list_len = len(flight_route_list)
-    # route_len_hot = len(vj_flight_route_list_hot[:0])
-    # route_len_nothot = len(vj_flight_route_list_nothot[s:])
+    s = 38   # 菲律宾2025-12-08是节假日 s=38 选到马尼拉 
+    flight_route_list = vj_flight_route_list_hot[0:] + vj_flight_route_list_nothot[s:]
+    flight_route_list_len = len(flight_route_list)
+    route_len_hot = len(vj_flight_route_list_hot[0:])
+    route_len_nothot = len(vj_flight_route_list_nothot[s:])
     
     
     if local_rank == 0:
     if local_rank == 0:
         print(f"flight_route_list_len:{flight_route_list_len}")
         print(f"flight_route_list_len:{flight_route_list_len}")
@@ -193,6 +199,13 @@ def start_train():
             
             
             # 数据预处理
             # 数据预处理
             df_train_inputs = preprocess_data(df_train, features, categorical_features, is_training=True)
             df_train_inputs = preprocess_data(df_train, features, categorical_features, is_training=True)
+            print("预处理后数据样本:\n", df_train_inputs.head())
+
+            total_rows = df_train_inputs.shape[0]
+
+
+
+
             pass
             pass
 
 
         else:
         else: