|
|
@@ -0,0 +1,116 @@
|
|
|
+import time
|
|
|
+import random
|
|
|
+from datetime import datetime, timedelta
|
|
|
+import pymongo
|
|
|
+from pymongo.errors import PyMongoError, ServerSelectionTimeoutError
|
|
|
+import pandas as pd
|
|
|
+from uo_atlas_import import mongo_con_parse
|
|
|
+from config import mongo_config, mongo_table_uo, uo_city_pairs
|
|
|
+
|
|
|
+
|
|
|
+def query_groups_of_city_pair(db, city_pair, table_name, min_days=10, max_retries=3, base_sleep=1.0):
|
|
|
+ """根据city_pair查询航线, 筛选1个月内至少有10天起飞的航线"""
|
|
|
+ print(f"{city_pair} 查找所有分组")
|
|
|
+ date_begin = (datetime.today() - timedelta(days=30)).strftime("%Y-%m-%d")
|
|
|
+ date_end = datetime.today().strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+ # 聚合查询管道
|
|
|
+ pipeline = [
|
|
|
+ {
|
|
|
+ "$match": {
|
|
|
+ "citypair": city_pair,
|
|
|
+ "from_date": {
|
|
|
+ "$gte": date_begin,
|
|
|
+ "$lte": date_end
|
|
|
+ }
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "$group": {
|
|
|
+ "_id": {
|
|
|
+ "flight_numbers": "$flight_numbers",
|
|
|
+ "from_date": "$from_date"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "$group": {
|
|
|
+ "_id": "$_id.flight_numbers",
|
|
|
+ "days": {"$sum": 1},
|
|
|
+ "details": {"$push": "$_id.from_date"}
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "$match": {
|
|
|
+ "days": {"$gte": min_days}
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "$addFields": {
|
|
|
+ "details": {"$sortArray": {"input": "$details", "sortBy": 1}}
|
|
|
+ }
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "$sort": {"_id": 1}
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ for attempt in range(1, max_retries + 1):
|
|
|
+ try:
|
|
|
+ print(f" 第 {attempt}/{max_retries} 次尝试查询")
|
|
|
+
|
|
|
+ # 执行聚合查询
|
|
|
+ collection = db[table_name]
|
|
|
+ results = list(collection.aggregate(pipeline))
|
|
|
+
|
|
|
+ # 格式化结果,使字段名更清晰
|
|
|
+ formatted_results = [
|
|
|
+ {
|
|
|
+ "flight_number": r["_id"],
|
|
|
+ "days": r["days"],
|
|
|
+ "flight_dates": r["details"]
|
|
|
+ }
|
|
|
+ for r in results
|
|
|
+ ]
|
|
|
+
|
|
|
+ return formatted_results
|
|
|
+
|
|
|
+ except (ServerSelectionTimeoutError, PyMongoError) as e:
|
|
|
+ print(f"⚠️ Mongo 查询失败: {e}")
|
|
|
+ if attempt == max_retries:
|
|
|
+ print("❌ 达到最大重试次数,放弃")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 指数退避 + 随机抖动
|
|
|
+ sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random()
|
|
|
+ print(f"⏳ {sleep_time:.2f}s 后重试...")
|
|
|
+ time.sleep(sleep_time)
|
|
|
+
|
|
|
+
|
|
|
+def load_data(db_config, city_pair, from_date_begin, from_date_end):
|
|
|
+
|
|
|
+ print(f"开始处理航线: {city_pair}")
|
|
|
+ main_client, main_db = mongo_con_parse(db_config)
|
|
|
+ all_groups = query_groups_of_city_pair(main_db, city_pair, mongo_table_uo)
|
|
|
+ main_client.close()
|
|
|
+
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+
|
|
|
+ from_date_begin = "2026-03-17"
|
|
|
+ from_date_end = "2026-04-01"
|
|
|
+
|
|
|
+ uo_city_pair_list = [f"{pair[:3]}-{pair[3:]}" for pair in uo_city_pairs]
|
|
|
+
|
|
|
+ for idx, uo_city_pair in enumerate(uo_city_pair_list, start=1):
|
|
|
+ # 使用默认配置
|
|
|
+ # client, db = mongo_con_parse()
|
|
|
+ print(f"第 {idx} 组 :", uo_city_pair)
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+ load_data(mongo_config, uo_city_pair, from_date_begin, from_date_end)
|
|
|
+ end_time = time.time()
|
|
|
+ run_time = round(end_time - start_time, 3)
|
|
|
+ print(f"用时: {run_time} 秒")
|
|
|
+
|