data_loader.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import time
  2. import random
  3. from datetime import datetime, timedelta
  4. import pymongo
  5. from pymongo.errors import PyMongoError, ServerSelectionTimeoutError
  6. import pandas as pd
  7. from uo_atlas_import import mongo_con_parse
  8. from config import mongo_config, mongo_table_uo, uo_city_pairs
  9. def query_groups_of_city_pair(db, city_pair, table_name, min_days=10, max_retries=3, base_sleep=1.0):
  10. """根据city_pair查询航线, 筛选1个月内至少有10天起飞的航线"""
  11. print(f"{city_pair} 查找所有分组")
  12. date_begin = (datetime.today() - timedelta(days=30)).strftime("%Y-%m-%d")
  13. date_end = datetime.today().strftime("%Y-%m-%d")
  14. # 聚合查询管道
  15. pipeline = [
  16. {
  17. "$match": {
  18. "citypair": city_pair,
  19. "from_date": {
  20. "$gte": date_begin,
  21. "$lte": date_end
  22. }
  23. }
  24. },
  25. {
  26. "$group": {
  27. "_id": {
  28. "flight_numbers": "$flight_numbers",
  29. "from_date": "$from_date"
  30. }
  31. }
  32. },
  33. {
  34. "$group": {
  35. "_id": "$_id.flight_numbers",
  36. "days": {"$sum": 1},
  37. "details": {"$push": "$_id.from_date"}
  38. }
  39. },
  40. {
  41. "$match": {
  42. "days": {"$gte": min_days}
  43. }
  44. },
  45. {
  46. "$addFields": {
  47. "details": {"$sortArray": {"input": "$details", "sortBy": 1}}
  48. }
  49. },
  50. {
  51. "$sort": {"_id": 1}
  52. }
  53. ]
  54. for attempt in range(1, max_retries + 1):
  55. try:
  56. print(f" 第 {attempt}/{max_retries} 次尝试查询")
  57. # 执行聚合查询
  58. collection = db[table_name]
  59. results = list(collection.aggregate(pipeline))
  60. # 格式化结果,使字段名更清晰
  61. formatted_results = [
  62. {
  63. "flight_number": r["_id"],
  64. "days": r["days"],
  65. "flight_dates": r["details"]
  66. }
  67. for r in results
  68. ]
  69. return formatted_results
  70. except (ServerSelectionTimeoutError, PyMongoError) as e:
  71. print(f"⚠️ Mongo 查询失败: {e}")
  72. if attempt == max_retries:
  73. print("❌ 达到最大重试次数,放弃")
  74. return []
  75. # 指数退避 + 随机抖动
  76. sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random()
  77. print(f"⏳ {sleep_time:.2f}s 后重试...")
  78. time.sleep(sleep_time)
  79. def load_data(db_config, city_pair, from_date_begin, from_date_end):
  80. print(f"开始处理航线: {city_pair}")
  81. main_client, main_db = mongo_con_parse(db_config)
  82. all_groups = query_groups_of_city_pair(main_db, city_pair, mongo_table_uo)
  83. main_client.close()
  84. pass
  85. if __name__ == "__main__":
  86. from_date_begin = "2026-03-17"
  87. from_date_end = "2026-04-01"
  88. uo_city_pair_list = [f"{pair[:3]}-{pair[3:]}" for pair in uo_city_pairs]
  89. for idx, uo_city_pair in enumerate(uo_city_pair_list, start=1):
  90. # 使用默认配置
  91. # client, db = mongo_con_parse()
  92. print(f"第 {idx} 组 :", uo_city_pair)
  93. start_time = time.time()
  94. load_data(mongo_config, uo_city_pair, from_date_begin, from_date_end)
  95. end_time = time.time()
  96. run_time = round(end_time - start_time, 3)
  97. print(f"用时: {run_time} 秒")