|
@@ -6,6 +6,8 @@ from pymongo.errors import PyMongoError, ServerSelectionTimeoutError
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
import os
|
|
import os
|
|
|
import random
|
|
import random
|
|
|
|
|
+import threading
|
|
|
|
|
+from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.pyplot as plt
|
|
|
from matplotlib import font_manager
|
|
from matplotlib import font_manager
|
|
@@ -67,7 +69,7 @@ def test_mongo_connection(db):
|
|
|
|
|
|
|
|
|
|
|
|
|
def query_flight_range_status(db, table_name, from_city, to_city, dep_date_begin, dep_date_end, flight_nums,
|
|
def query_flight_range_status(db, table_name, from_city, to_city, dep_date_begin, dep_date_end, flight_nums,
|
|
|
- limit=0, max_retries=3, base_sleep=1.0):
|
|
|
|
|
|
|
+ limit=0, max_retries=3, base_sleep=1.0, thread_id=0):
|
|
|
"""
|
|
"""
|
|
|
从指定表(4类)查询数据(指定起飞天的范围) (失败自动重试)
|
|
从指定表(4类)查询数据(指定起飞天的范围) (失败自动重试)
|
|
|
"""
|
|
"""
|
|
@@ -132,7 +134,7 @@ def query_flight_range_status(db, table_name, from_city, to_city, dep_date_begin
|
|
|
# 1️⃣ 展开 segments
|
|
# 1️⃣ 展开 segments
|
|
|
print(f"📊 开始扩展segments 稍等...")
|
|
print(f"📊 开始扩展segments 稍等...")
|
|
|
t1 = time.time()
|
|
t1 = time.time()
|
|
|
- df = expand_segments_columns(df)
|
|
|
|
|
|
|
+ df = expand_segments_columns_optimized(df) # 改为调用优化版
|
|
|
t2 = time.time()
|
|
t2 = time.time()
|
|
|
rt = round(t2 - t1, 3)
|
|
rt = round(t2 - t1, 3)
|
|
|
print(f"用时: {rt} 秒")
|
|
print(f"用时: {rt} 秒")
|
|
@@ -157,73 +159,148 @@ def query_flight_range_status(db, table_name, from_city, to_city, dep_date_begin
|
|
|
time.sleep(sleep_time)
|
|
time.sleep(sleep_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def expand_segments_columns(df):
|
|
|
|
|
- """展开 segments"""
|
|
|
|
|
|
|
+# def expand_segments_columns(df):
|
|
|
|
|
+# """展开 segments"""
|
|
|
|
|
+# df = df.copy()
|
|
|
|
|
+
|
|
|
|
|
+# # 定义要展开的列
|
|
|
|
|
+# seg1_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time', 'cabin', 'baggage']
|
|
|
|
|
+# seg2_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time']
|
|
|
|
|
+
|
|
|
|
|
+# # 定义 apply 函数一次返回字典
|
|
|
|
|
+# def extract_segments(row):
|
|
|
|
|
+# segments = row.get('segments')
|
|
|
|
|
+# result = {}
|
|
|
|
|
+# # 默认缺失使用 pd.NA(对字符串友好)
|
|
|
|
|
+# missing = pd.NA
|
|
|
|
|
+# if isinstance(segments, list):
|
|
|
|
|
+# # 第一段
|
|
|
|
|
+# if len(segments) >= 1 and isinstance(segments[0], dict):
|
|
|
|
|
+# for col in seg1_cols:
|
|
|
|
|
+# result[f'seg1_{col}'] = segments[0].get(col)
|
|
|
|
|
+# else:
|
|
|
|
|
+# for col in seg1_cols:
|
|
|
|
|
+# result[f'seg1_{col}'] = missing
|
|
|
|
|
+# # 第二段
|
|
|
|
|
+# if len(segments) >= 2 and isinstance(segments[1], dict):
|
|
|
|
|
+# for col in seg2_cols:
|
|
|
|
|
+# result[f'seg2_{col}'] = segments[1].get(col)
|
|
|
|
|
+# else:
|
|
|
|
|
+# for col in seg2_cols:
|
|
|
|
|
+# result[f'seg2_{col}'] = missing
|
|
|
|
|
+# else:
|
|
|
|
|
+# # segments 不是 list,全都置空
|
|
|
|
|
+# for col in seg1_cols:
|
|
|
|
|
+# result[f'seg1_{col}'] = missing
|
|
|
|
|
+# for col in seg2_cols:
|
|
|
|
|
+# result[f'seg2_{col}'] = missing
|
|
|
|
|
+
|
|
|
|
|
+# return pd.Series(result)
|
|
|
|
|
+
|
|
|
|
|
+# # 一次 apply
|
|
|
|
|
+# df_segments = df.apply(extract_segments, axis=1)
|
|
|
|
|
+
|
|
|
|
|
+# # 拼回原 df
|
|
|
|
|
+# df = pd.concat([df.drop(columns=['segments'], errors='ignore'), df_segments], axis=1)
|
|
|
|
|
+
|
|
|
|
|
+# # 统一转换时间字段为 datetime
|
|
|
|
|
+# time_cols = [
|
|
|
|
|
+# 'seg1_dep_time', 'seg1_arr_time',
|
|
|
|
|
+# 'seg2_dep_time', 'seg2_arr_time'
|
|
|
|
|
+# ]
|
|
|
|
|
+# for col in time_cols:
|
|
|
|
|
+# if col in df.columns:
|
|
|
|
|
+# df[col] = pd.to_datetime(
|
|
|
|
|
+# df[col],
|
|
|
|
|
+# format='%Y%m%d%H%M%S',
|
|
|
|
|
+# errors='coerce'
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# # 站点来源 -> 是否近期
|
|
|
|
|
+# df['source_website'] = np.where(
|
|
|
|
|
+# df['source_website'].str.contains('7_30'),
|
|
|
|
|
+# 0, # 远期 -> 0
|
|
|
|
|
+# np.where(df['source_website'].str.contains('0_7'),
|
|
|
|
|
+# 1, # 近期 -> 1
|
|
|
|
|
+# df['source_website']) # 其他情况保持原值
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# # 行李配额字符 -> 数字
|
|
|
|
|
+# conditions = [
|
|
|
|
|
+# df['seg1_baggage'] == '-;-;-;-',
|
|
|
|
|
+# df['seg1_baggage'] == '1-20',
|
|
|
|
|
+# df['seg1_baggage'] == '1-30',
|
|
|
|
|
+# df['seg1_baggage'] == '1-40',
|
|
|
|
|
+# ]
|
|
|
|
|
+# choices = [0, 20, 30, 40]
|
|
|
|
|
+# df['seg1_baggage'] = np.select(conditions, choices, default=df['seg1_baggage'])
|
|
|
|
|
+
|
|
|
|
|
+# # 重命名字段
|
|
|
|
|
+# df = df.rename(columns={
|
|
|
|
|
+# 'seg1_cabin': 'cabin',
|
|
|
|
|
+# 'seg1_baggage': 'baggage',
|
|
|
|
|
+# 'source_website': 'is_near',
|
|
|
|
|
+# })
|
|
|
|
|
+
|
|
|
|
|
+# return df
|
|
|
|
|
+
|
|
|
|
|
+def expand_segments_columns_optimized(df):
|
|
|
|
|
+ """优化版的展开segments函数(避免逐行apply)"""
|
|
|
|
|
+ if df.empty:
|
|
|
|
|
+ return df
|
|
|
|
|
+
|
|
|
df = df.copy()
|
|
df = df.copy()
|
|
|
|
|
|
|
|
- # 定义要展开的列
|
|
|
|
|
- seg1_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time', 'cabin', 'baggage']
|
|
|
|
|
- seg2_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time']
|
|
|
|
|
-
|
|
|
|
|
- # 定义 apply 函数一次返回字典
|
|
|
|
|
- def extract_segments(row):
|
|
|
|
|
- segments = row.get('segments')
|
|
|
|
|
- result = {}
|
|
|
|
|
- # 默认缺失使用 pd.NA(对字符串友好)
|
|
|
|
|
- missing = pd.NA
|
|
|
|
|
- if isinstance(segments, list):
|
|
|
|
|
- # 第一段
|
|
|
|
|
- if len(segments) >= 1 and isinstance(segments[0], dict):
|
|
|
|
|
|
|
+ # 直接操作segments列表,避免逐行apply
|
|
|
|
|
+ if 'segments' in df.columns:
|
|
|
|
|
+ # 提取第一段信息
|
|
|
|
|
+ seg1_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time', 'cabin', 'baggage']
|
|
|
|
|
+ # 提取第二段信息
|
|
|
|
|
+ seg2_cols = ['flight_number', 'dep_air_port', 'dep_time', 'arr_air_port', 'arr_time']
|
|
|
|
|
+
|
|
|
|
|
+ # 使用列表推导式替代apply,大幅提升性能
|
|
|
|
|
+ seg1_data = []
|
|
|
|
|
+ seg2_data = []
|
|
|
|
|
+
|
|
|
|
|
+ for segments in df['segments']:
|
|
|
|
|
+ seg1_dict = {}
|
|
|
|
|
+ seg2_dict = {}
|
|
|
|
|
+
|
|
|
|
|
+ if isinstance(segments, list) and len(segments) >= 1 and isinstance(segments[0], dict):
|
|
|
for col in seg1_cols:
|
|
for col in seg1_cols:
|
|
|
- result[f'seg1_{col}'] = segments[0].get(col)
|
|
|
|
|
|
|
+ seg1_dict[f'seg1_{col}'] = segments[0].get(col)
|
|
|
else:
|
|
else:
|
|
|
for col in seg1_cols:
|
|
for col in seg1_cols:
|
|
|
- result[f'seg1_{col}'] = missing
|
|
|
|
|
- # 第二段
|
|
|
|
|
- if len(segments) >= 2 and isinstance(segments[1], dict):
|
|
|
|
|
|
|
+ seg1_dict[f'seg1_{col}'] = pd.NA
|
|
|
|
|
+
|
|
|
|
|
+ if isinstance(segments, list) and len(segments) >= 2 and isinstance(segments[1], dict):
|
|
|
for col in seg2_cols:
|
|
for col in seg2_cols:
|
|
|
- result[f'seg2_{col}'] = segments[1].get(col)
|
|
|
|
|
|
|
+ seg2_dict[f'seg2_{col}'] = segments[1].get(col)
|
|
|
else:
|
|
else:
|
|
|
for col in seg2_cols:
|
|
for col in seg2_cols:
|
|
|
- result[f'seg2_{col}'] = missing
|
|
|
|
|
- else:
|
|
|
|
|
- # segments 不是 list,全都置空
|
|
|
|
|
- for col in seg1_cols:
|
|
|
|
|
- result[f'seg1_{col}'] = missing
|
|
|
|
|
- for col in seg2_cols:
|
|
|
|
|
- result[f'seg2_{col}'] = missing
|
|
|
|
|
-
|
|
|
|
|
- return pd.Series(result)
|
|
|
|
|
|
|
+ seg2_dict[f'seg2_{col}'] = pd.NA
|
|
|
|
|
+
|
|
|
|
|
+ seg1_data.append(seg1_dict)
|
|
|
|
|
+ seg2_data.append(seg2_dict)
|
|
|
|
|
|
|
|
- # 一次 apply
|
|
|
|
|
- df_segments = df.apply(extract_segments, axis=1)
|
|
|
|
|
|
|
+ # 创建DataFrame
|
|
|
|
|
+ df_seg1 = pd.DataFrame(seg1_data, index=df.index)
|
|
|
|
|
+ df_seg2 = pd.DataFrame(seg2_data, index=df.index)
|
|
|
|
|
|
|
|
- # 拼回原 df
|
|
|
|
|
- df = pd.concat([df.drop(columns=['segments'], errors='ignore'), df_segments], axis=1)
|
|
|
|
|
|
|
+ # 合并到原DataFrame
|
|
|
|
|
+ df = pd.concat([df.drop(columns=['segments'], errors='ignore'), df_seg1, df_seg2], axis=1)
|
|
|
|
|
|
|
|
- # 统一转换时间字段为 datetime
|
|
|
|
|
- time_cols = [
|
|
|
|
|
- 'seg1_dep_time', 'seg1_arr_time',
|
|
|
|
|
- 'seg2_dep_time', 'seg2_arr_time'
|
|
|
|
|
- ]
|
|
|
|
|
|
|
+ # 后续处理保持不变
|
|
|
|
|
+ time_cols = ['seg1_dep_time', 'seg1_arr_time', 'seg2_dep_time', 'seg2_arr_time']
|
|
|
for col in time_cols:
|
|
for col in time_cols:
|
|
|
if col in df.columns:
|
|
if col in df.columns:
|
|
|
- df[col] = pd.to_datetime(
|
|
|
|
|
- df[col],
|
|
|
|
|
- format='%Y%m%d%H%M%S',
|
|
|
|
|
- errors='coerce'
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 站点来源 -> 是否近期
|
|
|
|
|
|
|
+ df[col] = pd.to_datetime(df[col], format='%Y%m%d%H%M%S', errors='coerce')
|
|
|
|
|
+
|
|
|
df['source_website'] = np.where(
|
|
df['source_website'] = np.where(
|
|
|
- df['source_website'].str.contains('7_30'),
|
|
|
|
|
- 0, # 远期 -> 0
|
|
|
|
|
- np.where(df['source_website'].str.contains('0_7'),
|
|
|
|
|
- 1, # 近期 -> 1
|
|
|
|
|
- df['source_website']) # 其他情况保持原值
|
|
|
|
|
|
|
+ df['source_website'].str.contains('7_30'), 0,
|
|
|
|
|
+ np.where(df['source_website'].str.contains('0_7'), 1, df['source_website'])
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 行李配额字符 -> 数字
|
|
|
|
|
conditions = [
|
|
conditions = [
|
|
|
df['seg1_baggage'] == '-;-;-;-',
|
|
df['seg1_baggage'] == '-;-;-;-',
|
|
|
df['seg1_baggage'] == '1-20',
|
|
df['seg1_baggage'] == '1-20',
|
|
@@ -233,13 +310,12 @@ def expand_segments_columns(df):
|
|
|
choices = [0, 20, 30, 40]
|
|
choices = [0, 20, 30, 40]
|
|
|
df['seg1_baggage'] = np.select(conditions, choices, default=df['seg1_baggage'])
|
|
df['seg1_baggage'] = np.select(conditions, choices, default=df['seg1_baggage'])
|
|
|
|
|
|
|
|
- # 重命名字段
|
|
|
|
|
df = df.rename(columns={
|
|
df = df.rename(columns={
|
|
|
'seg1_cabin': 'cabin',
|
|
'seg1_cabin': 'cabin',
|
|
|
'seg1_baggage': 'baggage',
|
|
'seg1_baggage': 'baggage',
|
|
|
'source_website': 'is_near',
|
|
'source_website': 'is_near',
|
|
|
})
|
|
})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
@@ -564,152 +640,217 @@ def plot_c12_trend(df, output_dir="."):
|
|
|
plt.close(fig)
|
|
plt.close(fig)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def load_train_data(db, flight_route_list, table_name, date_begin, date_end, output_dir='.', is_hot=1):
|
|
|
|
|
- """加载训练数据"""
|
|
|
|
|
|
|
+def process_flight_group(args):
|
|
|
|
|
+ """处理单个航班号的线程函数(独立数据库连接)"""
|
|
|
|
|
+ thread_id, db_config, each_group, from_city, to_city, date_begin_s, date_end_s, is_hot, plot_flag, output_dir = args
|
|
|
|
|
+ flight_nums = each_group.get("flight_numbers")
|
|
|
|
|
+ details = each_group.get("details")
|
|
|
|
|
+
|
|
|
|
|
+ print(f"[线程{thread_id}] 开始处理航班号: {flight_nums}")
|
|
|
|
|
+
|
|
|
|
|
+ # 为每个线程创建独立的数据库连接
|
|
|
|
|
+ try:
|
|
|
|
|
+ client, db = mongo_con_parse(db_config)
|
|
|
|
|
+ print(f"[线程{thread_id}] ✅ 数据库连接创建成功")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"[线程{thread_id}] ❌ 数据库连接创建失败: {e}")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 查询远期表
|
|
|
|
|
+ if is_hot == 1:
|
|
|
|
|
+ df1 = query_flight_range_status(db, CLEAN_VJ_HOT_FAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
+ date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
+ else:
|
|
|
|
|
+ df1 = query_flight_range_status(db, CLEAN_VJ_NOTHOT_FAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
+ date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
+
|
|
|
|
|
+ # 保证远期表里有数据
|
|
|
|
|
+ if df1.empty:
|
|
|
|
|
+ print(f"[线程{thread_id}] 航班号:{flight_nums} 远期表无数据, 跳过")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
+
|
|
|
|
|
+ # 查询近期表
|
|
|
|
|
+ if is_hot == 1:
|
|
|
|
|
+ df2 = query_flight_range_status(db, CLEAN_VJ_HOT_NEAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
+ date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
+ else:
|
|
|
|
|
+ df2 = query_flight_range_status(db, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
+ date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
+
|
|
|
|
|
+ # 保证近期表里有数据
|
|
|
|
|
+ if df2.empty:
|
|
|
|
|
+ print(f"[线程{thread_id}] 航班号:{flight_nums} 近期表无数据, 跳过")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
+
|
|
|
|
|
+ # 起飞天数、行李配额以近期表的为主
|
|
|
|
|
+ if df2.empty:
|
|
|
|
|
+ common_dep_dates = []
|
|
|
|
|
+ common_baggages = []
|
|
|
|
|
+ else:
|
|
|
|
|
+ common_dep_dates = df2['search_dep_time'].unique()
|
|
|
|
|
+ common_baggages = df2['baggage'].unique()
|
|
|
|
|
+
|
|
|
|
|
+ list_mid = []
|
|
|
|
|
+ for dep_date in common_dep_dates:
|
|
|
|
|
+ # 起飞日期筛选
|
|
|
|
|
+ df_d1 = df1[df1["search_dep_time"] == dep_date].copy()
|
|
|
|
|
+ if not df_d1.empty:
|
|
|
|
|
+ for col in ["seg1_dep_time", "seg1_arr_time", "seg2_dep_time", "seg2_arr_time"]:
|
|
|
|
|
+ mode_series_1 = df_d1[col].mode()
|
|
|
|
|
+ if mode_series_1.empty:
|
|
|
|
|
+ zong_1 = pd.NaT
|
|
|
|
|
+ else:
|
|
|
|
|
+ zong_1 = mode_series_1.iloc[0]
|
|
|
|
|
+ df_d1[col] = zong_1
|
|
|
|
|
+
|
|
|
|
|
+ df_d2 = df2[df2["search_dep_time"] == dep_date].copy()
|
|
|
|
|
+ if not df_d2.empty:
|
|
|
|
|
+ for col in ["seg1_dep_time", "seg1_arr_time", "seg2_dep_time", "seg2_arr_time"]:
|
|
|
|
|
+ mode_series_2 = df_d2[col].mode()
|
|
|
|
|
+ if mode_series_2.empty:
|
|
|
|
|
+ zong_2 = pd.NaT
|
|
|
|
|
+ else:
|
|
|
|
|
+ zong_2 = mode_series_2.iloc[0]
|
|
|
|
|
+ df_d2[col] = zong_2
|
|
|
|
|
+
|
|
|
|
|
+ list_12 = []
|
|
|
|
|
+ for baggage in common_baggages:
|
|
|
|
|
+ # 行李配额筛选
|
|
|
|
|
+ df_b1 = df_d1[df_d1["baggage"] == baggage].copy()
|
|
|
|
|
+ df_b2 = df_d2[df_d2["baggage"] == baggage].copy()
|
|
|
|
|
+
|
|
|
|
|
+ # 合并前检查是否都有数据
|
|
|
|
|
+ if df_b1.empty and df_b2.empty:
|
|
|
|
|
+ print(f"[线程{thread_id}] ⚠️ dep_date:{dep_date}, baggage:{baggage} 远期表和近期表都为空,跳过")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ cols = ["seg1_flight_number", "seg1_dep_air_port", "seg1_arr_air_port",
|
|
|
|
|
+ "seg2_flight_number", "seg2_dep_air_port", "seg2_arr_air_port"]
|
|
|
|
|
+ df_b1[cols] = df_b1[cols].astype("string")
|
|
|
|
|
+ df_b2[cols] = df_b2[cols].astype("string")
|
|
|
|
|
+
|
|
|
|
|
+ df_b12 = pd.concat([df_b1, df_b2]).reset_index(drop=True)
|
|
|
|
|
+ # print(f"📊 dep_date:{dep_date}, baggage:{baggage} 已将远期表和近期表合并,形状: {df_b12.shape}")
|
|
|
|
|
+ df_b12 = fill_hourly_crawl_date(df_b12, rear_fill=2)
|
|
|
|
|
+ # print(f"📊 dep_date:{dep_date}, baggage:{baggage} 已合并且补齐为完整小时序列,形状: {df_b12.shape}")
|
|
|
|
|
+ list_12.append(df_b12)
|
|
|
|
|
+
|
|
|
|
|
+ del df_b12
|
|
|
|
|
+ del df_b2
|
|
|
|
|
+ del df_b1
|
|
|
|
|
+
|
|
|
|
|
+ if list_12:
|
|
|
|
|
+ df_c12 = pd.concat(list_12, ignore_index=True)
|
|
|
|
|
+ if plot_flag:
|
|
|
|
|
+ print(f"[线程{thread_id}] ✅ dep_date:{dep_date}, 所有 baggage 数据合并完成,总形状: {df_c12.shape}")
|
|
|
|
|
+ plot_c12_trend(df_c12, output_dir)
|
|
|
|
|
+ print(f"[线程{thread_id}] ✅ dep_date:{dep_date}, 所有 baggage 数据绘图完成")
|
|
|
|
|
+ else:
|
|
|
|
|
+ df_c12 = pd.DataFrame()
|
|
|
|
|
+ if plot_flag:
|
|
|
|
|
+ print(f"[线程{thread_id}] ⚠️ dep_date:{dep_date}, 所有 baggage 数据合并为空")
|
|
|
|
|
+
|
|
|
|
|
+ del list_12
|
|
|
|
|
+ list_mid.append(df_c12)
|
|
|
|
|
+
|
|
|
|
|
+ del df_c12
|
|
|
|
|
+ del df_d1
|
|
|
|
|
+ del df_d2
|
|
|
|
|
+ # print(f"结束处理起飞日期: {dep_date}")
|
|
|
|
|
+
|
|
|
|
|
+ if list_mid:
|
|
|
|
|
+ df_mid = pd.concat(list_mid, ignore_index=True)
|
|
|
|
|
+ print(f"[线程{thread_id}] ✅ 航班号:{flight_nums} 所有 起飞日期 数据合并完成,总形状: {df_mid.shape}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ df_mid = pd.DataFrame()
|
|
|
|
|
+ print(f"[线程{thread_id}] ⚠️ 航班号:{flight_nums} 所有 起飞日期 数据合并为空")
|
|
|
|
|
+
|
|
|
|
|
+ del list_mid
|
|
|
|
|
+ del df1
|
|
|
|
|
+ del df2
|
|
|
|
|
+ gc.collect()
|
|
|
|
|
+ print(f"[线程{thread_id}] 结束处理航班号: {flight_nums}")
|
|
|
|
|
+ return df_mid
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"[线程{thread_id}] ❌ 处理航班号:{flight_nums} 时发生异常: {e}")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
+ finally:
|
|
|
|
|
+ # 确保关闭数据库连接
|
|
|
|
|
+ try:
|
|
|
|
|
+ client.close()
|
|
|
|
|
+ print(f"[线程{thread_id}] ✅ 数据库连接已关闭")
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def load_train_data(db_config, flight_route_list, table_name, date_begin, date_end, output_dir='.', is_hot=1, plot_flag=False,
|
|
|
|
|
+ use_multithread=False, max_workers=None):
|
|
|
|
|
+ """加载训练数据(支持多线程)"""
|
|
|
timestamp_str = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
timestamp_str = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
|
date_begin_s = datetime.strptime(date_begin, "%Y-%m-%d").strftime("%Y%m%d") # 查询时的格式
|
|
date_begin_s = datetime.strptime(date_begin, "%Y-%m-%d").strftime("%Y%m%d") # 查询时的格式
|
|
|
date_end_s = datetime.strptime(date_end, "%Y-%m-%d").strftime("%Y%m%d")
|
|
date_end_s = datetime.strptime(date_end, "%Y-%m-%d").strftime("%Y%m%d")
|
|
|
list_all = []
|
|
list_all = []
|
|
|
|
|
+
|
|
|
# 每一航线对
|
|
# 每一航线对
|
|
|
for flight_route in flight_route_list:
|
|
for flight_route in flight_route_list:
|
|
|
from_city = flight_route.split('-')[0]
|
|
from_city = flight_route.split('-')[0]
|
|
|
to_city = flight_route.split('-')[1]
|
|
to_city = flight_route.split('-')[1]
|
|
|
route = f"{from_city}-{to_city}"
|
|
route = f"{from_city}-{to_city}"
|
|
|
print(f"开始处理航线: {route}")
|
|
print(f"开始处理航线: {route}")
|
|
|
- all_groups = query_groups_of_city_code(db, from_city, to_city, table_name)
|
|
|
|
|
- all_groups_len = len(all_groups)
|
|
|
|
|
- print(f"该航线共有{all_groups_len}个航班号")
|
|
|
|
|
- # 每一组航班号
|
|
|
|
|
- for each_group in all_groups:
|
|
|
|
|
- flight_nums = each_group.get("flight_numbers")
|
|
|
|
|
- print(f"开始处理航班号: {flight_nums}")
|
|
|
|
|
- details = each_group.get("details")
|
|
|
|
|
-
|
|
|
|
|
- print(f"查远期表")
|
|
|
|
|
- if is_hot == 1:
|
|
|
|
|
- df1 = query_flight_range_status(db, CLEAN_VJ_HOT_FAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
- date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
- else:
|
|
|
|
|
- df1 = query_flight_range_status(db, CLEAN_VJ_NOTHOT_FAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
- date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
-
|
|
|
|
|
- # 保证远期表里有数据
|
|
|
|
|
- if df1.empty:
|
|
|
|
|
- print(f"航班号:{flight_nums} 远期表无数据, 跳过")
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- print(f"查近期表")
|
|
|
|
|
- if is_hot == 1:
|
|
|
|
|
- df2 = query_flight_range_status(db, CLEAN_VJ_HOT_NEAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
- date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
- else:
|
|
|
|
|
- df2 = query_flight_range_status(db, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, from_city, to_city,
|
|
|
|
|
- date_begin_s, date_end_s, flight_nums)
|
|
|
|
|
-
|
|
|
|
|
- # 保证近期表里有数据
|
|
|
|
|
- if df2.empty:
|
|
|
|
|
- print(f"航班号:{flight_nums} 近期表无数据, 跳过")
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 起飞天数、行李配额以近期表的为主
|
|
|
|
|
- if df2.empty:
|
|
|
|
|
- common_dep_dates = []
|
|
|
|
|
- common_baggages = []
|
|
|
|
|
- else:
|
|
|
|
|
- common_dep_dates = df2['search_dep_time'].unique()
|
|
|
|
|
- common_baggages = df2['baggage'].unique()
|
|
|
|
|
-
|
|
|
|
|
- list_mid = []
|
|
|
|
|
- for dep_date in common_dep_dates:
|
|
|
|
|
- # 起飞日期筛选
|
|
|
|
|
- df_d1 = df1[df1["search_dep_time"] == dep_date].copy()
|
|
|
|
|
- if not df_d1.empty:
|
|
|
|
|
- for col in ["seg1_dep_time", "seg1_arr_time", "seg2_dep_time", "seg2_arr_time"]:
|
|
|
|
|
- mode_series_1 = df_d1[col].mode()
|
|
|
|
|
- if mode_series_1.empty:
|
|
|
|
|
- # 如果整个列都是 NaT,则众数为空,直接赋 NaT
|
|
|
|
|
- zong_1 = pd.NaT
|
|
|
|
|
- else:
|
|
|
|
|
- zong_1 = mode_series_1.iloc[0]
|
|
|
|
|
- df_d1[col] = zong_1
|
|
|
|
|
-
|
|
|
|
|
- df_d2 = df2[df2["search_dep_time"] == dep_date].copy()
|
|
|
|
|
- if not df_d2.empty:
|
|
|
|
|
- for col in ["seg1_dep_time", "seg1_arr_time", "seg2_dep_time", "seg2_arr_time"]:
|
|
|
|
|
- mode_series_2 = df_d2[col].mode()
|
|
|
|
|
- if mode_series_2.empty:
|
|
|
|
|
- # 如果整个列都是 NaT,则众数为空,直接赋 NaT
|
|
|
|
|
- zong_2 = pd.NaT
|
|
|
|
|
- else:
|
|
|
|
|
- zong_2 = mode_series_2.iloc[0]
|
|
|
|
|
- df_d2[col] = zong_2
|
|
|
|
|
-
|
|
|
|
|
- list_12 = []
|
|
|
|
|
- for baggage in common_baggages:
|
|
|
|
|
- # 行李配额筛选
|
|
|
|
|
- df_b1 = df_d1[df_d1["baggage"] == baggage].copy()
|
|
|
|
|
- df_b2 = df_d2[df_d2["baggage"] == baggage].copy()
|
|
|
|
|
-
|
|
|
|
|
- # 合并前检查是否都有数据
|
|
|
|
|
- if df_b1.empty and df_b2.empty:
|
|
|
|
|
- print(f"⚠️ dep_date:{dep_date}, baggage:{baggage} 远期表和近期表都为空,跳过")
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- cols = ["seg1_flight_number", "seg1_dep_air_port", "seg1_arr_air_port",
|
|
|
|
|
- "seg2_flight_number", "seg2_dep_air_port", "seg2_arr_air_port"]
|
|
|
|
|
- # df_b1 = df_b1.copy()
|
|
|
|
|
- # df_b2 = df_b2.copy()
|
|
|
|
|
- df_b1[cols] = df_b1[cols].astype("string")
|
|
|
|
|
- df_b2[cols] = df_b2[cols].astype("string")
|
|
|
|
|
-
|
|
|
|
|
- df_b12 = pd.concat([df_b1, df_b2]).reset_index(drop=True)
|
|
|
|
|
- # print(f"📊 dep_date:{dep_date}, baggage:{baggage} 已将远期表和近期表合并,形状: {df_b12.shape}")
|
|
|
|
|
- df_b12 = fill_hourly_crawl_date(df_b12, rear_fill=2)
|
|
|
|
|
- # print(f"📊 dep_date:{dep_date}, baggage:{baggage} 已合并且补齐为完整小时序列,形状: {df_b12.shape}")
|
|
|
|
|
- # print(df_b12.dtypes)
|
|
|
|
|
- list_12.append(df_b12)
|
|
|
|
|
- del df_b12
|
|
|
|
|
- del df_b2
|
|
|
|
|
- del df_b1
|
|
|
|
|
-
|
|
|
|
|
- if list_12:
|
|
|
|
|
- df_c12 = pd.concat(list_12, ignore_index=True)
|
|
|
|
|
- # print(f"✅ dep_date:{dep_date}, 所有 baggage 数据合并完成,总形状: {df_c12.shape}")
|
|
|
|
|
- # plot_c12_trend(df_c12, output_dir)
|
|
|
|
|
- # print(f"✅ dep_date:{dep_date}, 所有 baggage 数据绘图完成")
|
|
|
|
|
- else:
|
|
|
|
|
- df_c12 = pd.DataFrame()
|
|
|
|
|
- # print(f"⚠️ dep_date:{dep_date}, 所有 baggage 数据合并为空")
|
|
|
|
|
-
|
|
|
|
|
- del list_12
|
|
|
|
|
- list_mid.append(df_c12)
|
|
|
|
|
-
|
|
|
|
|
- del df_c12
|
|
|
|
|
- del df_d1
|
|
|
|
|
- del df_d2
|
|
|
|
|
-
|
|
|
|
|
- # print(f"结束处理起飞日期: {dep_date}")
|
|
|
|
|
-
|
|
|
|
|
- if list_mid:
|
|
|
|
|
- df_mid = pd.concat(list_mid, ignore_index=True)
|
|
|
|
|
- print(f"✅ 航班号:{flight_nums} 所有 起飞日期 数据合并完成,总形状: {df_mid.shape}")
|
|
|
|
|
- else:
|
|
|
|
|
- df_mid = pd.DataFrame()
|
|
|
|
|
- print(f"⚠️ 航班号:{flight_nums} 所有 起飞日期 数据合并为空")
|
|
|
|
|
|
|
|
|
|
- del list_mid
|
|
|
|
|
- list_all.append(df_mid)
|
|
|
|
|
|
|
+ # 在主线程中查询航班号分组(避免多线程重复查询)
|
|
|
|
|
+ main_client, main_db = mongo_con_parse(db_config)
|
|
|
|
|
+ all_groups = query_groups_of_city_code(main_db, from_city, to_city, table_name)
|
|
|
|
|
+ main_client.close()
|
|
|
|
|
|
|
|
- del df1
|
|
|
|
|
- del df2
|
|
|
|
|
-
|
|
|
|
|
- # output_path = os.path.join(output_dir, f"./{route}_{timestamp_str}.csv")
|
|
|
|
|
- # df_mid.to_csv(output_path, index=False, encoding="utf-8-sig", mode="a", header=not os.path.exists(output_path))
|
|
|
|
|
|
|
+ all_groups_len = len(all_groups)
|
|
|
|
|
+ print(f"该航线共有{all_groups_len}个航班号")
|
|
|
|
|
+
|
|
|
|
|
+ if use_multithread and all_groups_len > 1:
|
|
|
|
|
+ print(f"启用多线程处理,最大线程数: {max_workers}")
|
|
|
|
|
+ # 多线程处理
|
|
|
|
|
+ thread_args = []
|
|
|
|
|
+ thread_id = 0
|
|
|
|
|
+ for each_group in all_groups:
|
|
|
|
|
+ thread_id += 1
|
|
|
|
|
+ args = (thread_id, db_config, each_group, from_city, to_city, date_begin_s, date_end_s, is_hot, plot_flag, output_dir)
|
|
|
|
|
+ thread_args.append(args)
|
|
|
|
|
|
|
|
- del df_mid
|
|
|
|
|
- gc.collect()
|
|
|
|
|
- print(f"结束处理航班号: {flight_nums}")
|
|
|
|
|
|
|
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
+ future_to_group = {executor.submit(process_flight_group, args): each_group for args, each_group in zip(thread_args, all_groups)}
|
|
|
|
|
+
|
|
|
|
|
+ for future in as_completed(future_to_group):
|
|
|
|
|
+ each_group = future_to_group[future]
|
|
|
|
|
+ flight_nums = each_group.get("flight_numbers", "未知")
|
|
|
|
|
+ try:
|
|
|
|
|
+ df_mid = future.result()
|
|
|
|
|
+ if not df_mid.empty:
|
|
|
|
|
+ list_all.append(df_mid)
|
|
|
|
|
+ print(f"✅ 航班号:{flight_nums} 处理完成")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"⚠️ 航班号:{flight_nums} 处理结果为空")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ 航班号:{flight_nums} 处理异常: {e}")
|
|
|
|
|
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 单线程处理(线程编号为0)
|
|
|
|
|
+ print("使用单线程处理")
|
|
|
|
|
+ thread_id = 0
|
|
|
|
|
+ for each_group in all_groups:
|
|
|
|
|
+ args = (thread_id, db_config, each_group, from_city, to_city, date_begin_s, date_end_s, is_hot, plot_flag, output_dir)
|
|
|
|
|
+ flight_nums = each_group.get("flight_numbers", "未知")
|
|
|
|
|
+ try:
|
|
|
|
|
+ df_mid = process_flight_group(args)
|
|
|
|
|
+ if not df_mid.empty:
|
|
|
|
|
+ list_all.append(df_mid)
|
|
|
|
|
+ print(f"✅ 航班号:{flight_nums} 处理完成")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"⚠️ 航班号:{flight_nums} 处理结果为空")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"❌ 航班号:{flight_nums} 处理异常: {e}")
|
|
|
|
|
+
|
|
|
print(f"结束处理航线: {from_city}-{to_city}")
|
|
print(f"结束处理航线: {from_city}-{to_city}")
|
|
|
|
|
|
|
|
if list_all:
|
|
if list_all:
|
|
@@ -828,7 +969,7 @@ def validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_
|
|
|
# 1️⃣ 展开 segments
|
|
# 1️⃣ 展开 segments
|
|
|
print(f"📊 开始扩展segments 稍等...")
|
|
print(f"📊 开始扩展segments 稍等...")
|
|
|
t1 = time.time()
|
|
t1 = time.time()
|
|
|
- df = expand_segments_columns(df)
|
|
|
|
|
|
|
+ df = expand_segments_columns_optimized(df)
|
|
|
t2 = time.time()
|
|
t2 = time.time()
|
|
|
rt = round(t2 - t1, 3)
|
|
rt = round(t2 - t1, 3)
|
|
|
print(f"用时: {rt} 秒")
|
|
print(f"用时: {rt} 秒")
|
|
@@ -856,46 +997,51 @@ def validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
|
|
|
|
|
# test_mongo_connection(db)
|
|
# test_mongo_connection(db)
|
|
|
-
|
|
|
|
|
- # output_dir = f"./output"
|
|
|
|
|
- # os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
-
|
|
|
|
|
- # # 加载热门航线数据
|
|
|
|
|
- # date_begin = "2025-11-20"
|
|
|
|
|
- # date_end = datetime.today().strftime("%Y-%m-%d")
|
|
|
|
|
-
|
|
|
|
|
- # flight_route_list = vj_flight_route_list_hot[0:] # 热门 vj_flight_route_list_hot 冷门 vj_flight_route_list_nothot
|
|
|
|
|
- # table_name = CLEAN_VJ_HOT_NEAR_INFO_TAB # 热门 CLEAN_VJ_HOT_NEAR_INFO_TAB 冷门 CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
|
|
|
|
|
- # is_hot = 1 # 1 热门 0 冷门
|
|
|
|
|
- # group_size = 1
|
|
|
|
|
- # chunks = chunk_list(flight_route_list, group_size)
|
|
|
|
|
-
|
|
|
|
|
- # for idx, group_route_list in enumerate(chunks, 1):
|
|
|
|
|
- # # 使用默认配置
|
|
|
|
|
- # client, db = mongo_con_parse()
|
|
|
|
|
- # print(f"第 {idx} 组 :", group_route_list)
|
|
|
|
|
- # start_time = time.time()
|
|
|
|
|
- # load_train_data(db, group_route_list, table_name, date_begin, date_end, output_dir, is_hot)
|
|
|
|
|
- # end_time = time.time()
|
|
|
|
|
- # run_time = round(end_time - start_time, 3)
|
|
|
|
|
- # print(f"用时: {run_time} 秒")
|
|
|
|
|
-
|
|
|
|
|
- # client.close()
|
|
|
|
|
- # time.sleep(3)
|
|
|
|
|
-
|
|
|
|
|
- # print("整体结束")
|
|
|
|
|
-
|
|
|
|
|
- client, db = mongo_con_parse()
|
|
|
|
|
- list_flight_number_1 = query_all_flight_number(db, CLEAN_VJ_HOT_NEAR_INFO_TAB)
|
|
|
|
|
- list_flight_number_2 = query_all_flight_number(db, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB)
|
|
|
|
|
-
|
|
|
|
|
- list_flight_number_all = list_flight_number_1 + list_flight_number_2
|
|
|
|
|
- list_flight_number_all = list(set(list_flight_number_all))
|
|
|
|
|
- list_flight_number_all.sort()
|
|
|
|
|
|
|
+ from utils import chunk_list_with_index
|
|
|
|
|
+
|
|
|
|
|
+ cpu_cores = os.cpu_count() # 你的系统是72
|
|
|
|
|
+ max_workers = min(16, cpu_cores) # 最大不超过16个线程
|
|
|
|
|
+
|
|
|
|
|
+ output_dir = f"./output"
|
|
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ # 加载热门航线数据
|
|
|
|
|
+ date_begin = "2025-12-07"
|
|
|
|
|
+ date_end = datetime.today().strftime("%Y-%m-%d")
|
|
|
|
|
+
|
|
|
|
|
+ flight_route_list = vj_flight_route_list_hot[0:] # 热门 vj_flight_route_list_hot 冷门 vj_flight_route_list_nothot
|
|
|
|
|
+ table_name = CLEAN_VJ_HOT_NEAR_INFO_TAB # 热门 CLEAN_VJ_HOT_NEAR_INFO_TAB 冷门 CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
|
|
|
|
|
+ is_hot = 1 # 1 热门 0 冷门
|
|
|
|
|
+ group_size = 1
|
|
|
|
|
+ chunks = chunk_list_with_index(flight_route_list, group_size)
|
|
|
|
|
+
|
|
|
|
|
+ for idx, (_, group_route_list) in enumerate(chunks, 1):
|
|
|
|
|
+ # 使用默认配置
|
|
|
|
|
+ # client, db = mongo_con_parse()
|
|
|
|
|
+ print(f"第 {idx} 组 :", group_route_list)
|
|
|
|
|
+ start_time = time.time()
|
|
|
|
|
+ load_train_data(mongodb_config, group_route_list, table_name, date_begin, date_end, output_dir, is_hot, plot_flag=False,
|
|
|
|
|
+ use_multithread=False, max_workers=max_workers)
|
|
|
|
|
+ end_time = time.time()
|
|
|
|
|
+ run_time = round(end_time - start_time, 3)
|
|
|
|
|
+ print(f"用时: {run_time} 秒")
|
|
|
|
|
+
|
|
|
|
|
+ # client.close()
|
|
|
|
|
+ time.sleep(3)
|
|
|
|
|
+
|
|
|
|
|
+ print("整体结束")
|
|
|
|
|
+
|
|
|
|
|
+ # client, db = mongo_con_parse()
|
|
|
|
|
+ # list_flight_number_1 = query_all_flight_number(db, CLEAN_VJ_HOT_NEAR_INFO_TAB)
|
|
|
|
|
+ # list_flight_number_2 = query_all_flight_number(db, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB)
|
|
|
|
|
+
|
|
|
|
|
+ # list_flight_number_all = list_flight_number_1 + list_flight_number_2
|
|
|
|
|
+ # list_flight_number_all = list(set(list_flight_number_all))
|
|
|
|
|
+ # list_flight_number_all.sort()
|
|
|
|
|
|
|
|
- print(list_flight_number_all)
|
|
|
|
|
- print(len(list_flight_number_all))
|
|
|
|
|
|
|
+ # print(list_flight_number_all)
|
|
|
|
|
+ # print(len(list_flight_number_all))
|
|
|
|
|
|
|
|
- flight_map = {v: i for i, v in enumerate(list_flight_number_all, start=1)}
|
|
|
|
|
- print(flight_map)
|
|
|
|
|
|
|
+ # flight_map = {v: i for i, v in enumerate(list_flight_number_all, start=1)}
|
|
|
|
|
+ # print(flight_map)
|
|
|
|
|
|