uo_atlas_import.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import os
  2. import random
  3. import time
  4. from datetime import datetime, timedelta
  5. import pymongo
  6. from pymongo.errors import PyMongoError, ServerSelectionTimeoutError, BulkWriteError
  7. # import pandas as pd
  8. from config import atlas_config, mongo_config, atlas_table, mongo_table_uo, uo_city_pairs_old, uo_city_pairs_new
  9. def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp,
  10. limit=0, max_retries=3, base_sleep=1.0, out_table=None):
  11. """
  12. 从atlas查询指定城市对、时间范围的航班状态数据,写入mongo集合。
  13. :param atlas_db: atlas数据库连接
  14. :param mongo_db: mongo数据库连接
  15. :param city_pair: 城市对(例如:"BJSBKK")
  16. :param create_at_begin_stamp: 查询开始时间戳(秒级)
  17. :param create_at_end_stamp: 查询结束时间戳(秒级)
  18. :param limit: 限制返回结果数量(0表示不限制)
  19. :param max_retries: 最大重试次数
  20. :param base_sleep: 基础重试间隔(秒)
  21. :param out_table: 目标mongo集合名称(默认:mongo_table_uo)
  22. """
  23. for attempt in range(1, max_retries + 1):
  24. try:
  25. print(f"🔁 第 {attempt}/{max_retries} 次尝试查询")
  26. src_collection = atlas_db[atlas_table] # 源集合(atlas)
  27. out_collection = mongo_db[mongo_table_uo] # 目标集合(mongo)
  28. # 聚合查询管道
  29. pipeline = [
  30. {
  31. "$match": {
  32. "citypair": city_pair,
  33. "from_date": {"$ne": None},
  34. "flight_weight": {"$regex": r"^UO.*(0;0|1;20)$"}, # 使用$regex进行正则匹配
  35. "create_at": {"$gte": create_at_begin_stamp, "$lte": create_at_end_stamp}
  36. }
  37. },
  38. {
  39. "$addFields": {
  40. "create_at_beijing": {
  41. "$add": [
  42. {"$toDate": {"$multiply": ["$create_at", 1000]}},
  43. 8 * 60 * 60 * 1000 # 加8小时,保持Date类型
  44. ]
  45. }
  46. }
  47. },
  48. {
  49. "$sort": {"from_date": 1, "create_at": 1}
  50. }
  51. ]
  52. if limit > 0:
  53. pipeline.append({"$limit": limit})
  54. # 执行查询
  55. t1 = time.time()
  56. results = list(src_collection.aggregate(pipeline))
  57. t2 = time.time()
  58. rt = round(t2 - t1, 3)
  59. print(f"查询用时: {rt} 秒")
  60. # 写入结果(不落原表 _id/source_id,用业务字段做去重)
  61. upserted = 0
  62. matched = 0
  63. write_failed = 0
  64. batch_size = 500
  65. ops = []
  66. def flush_ops(ops_to_flush):
  67. nonlocal upserted, matched, write_failed
  68. if not ops_to_flush:
  69. return
  70. try:
  71. bulk_res = out_collection.bulk_write(ops_to_flush, ordered=False)
  72. upserted += bulk_res.upserted_count
  73. matched += bulk_res.matched_count
  74. except BulkWriteError as e:
  75. details = e.details or {}
  76. write_failed += len(details.get("writeErrors", []))
  77. upserted += details.get("nUpserted", 0)
  78. matched += details.get("nMatched", 0)
  79. except (ServerSelectionTimeoutError, PyMongoError) as e:
  80. write_failed += len(ops_to_flush)
  81. print(f"⚠️ Mongo 批量写入失败 {city_pair}: {e}")
  82. if isinstance(e, ServerSelectionTimeoutError):
  83. raise
  84. for i, doc in enumerate(results, start=1):
  85. citypair = doc.get('citypair')
  86. citypair_new = citypair[:3] + '-' + citypair[3:]
  87. from_date = doc.get('from_date')
  88. from_date_new = datetime.strptime(from_date, '%Y%m%d').strftime('%Y-%m-%d')
  89. flight_weight = doc.get('flight_weight')
  90. flight_weight_split = flight_weight.split(';')
  91. flight_numbers_raw = flight_weight_split[0]
  92. flight_numbers_parts = [x.strip() for x in flight_numbers_raw.split(',') if x.strip()]
  93. flight_numbers = ','.join(flight_numbers_parts)
  94. baggage_weight = int(flight_weight_split[-1])
  95. deptime = doc.get('depTime', '')
  96. if deptime:
  97. from_time = datetime.strptime(deptime, '%Y%m%d%H%M').strftime('%Y-%m-%d %H:%M:%S')
  98. else:
  99. from_time = '' # 3月16之前抓的数据没有起飞时间
  100. trip_type = doc.get('trip_type')
  101. cabin_raw = doc.get('cabin', '')
  102. cabin_parts = [x.strip() for x in cabin_raw.split(',') if x.strip()]
  103. cabins = ','.join(cabin_parts)
  104. ticket_amount = doc.get('ticket_amount')
  105. currency = doc.get('vendorCurrency', '')
  106. price_base = doc.get('price')
  107. price_tax = doc.get('tax')
  108. price_total = doc.get('total')
  109. create_at = doc.get('create_at')
  110. create_at_beijing = doc.get('create_at_beijing')
  111. create_time = create_at_beijing.strftime('%Y-%m-%d %H:%M:%S')
  112. new_doc = {
  113. "citypair": citypair_new,
  114. "from_date": from_date_new,
  115. "flight_numbers": flight_numbers,
  116. "from_time": from_time,
  117. "trip_type": trip_type,
  118. "cabins": cabins,
  119. "baggage_weight": baggage_weight,
  120. "ticket_amount": ticket_amount,
  121. "currency": currency,
  122. "price_base": price_base,
  123. "price_tax": price_tax,
  124. "price_total": price_total,
  125. "create_at": create_at,
  126. "create_time": create_time,
  127. }
  128. dedup_filter = {
  129. "citypair": citypair_new,
  130. "from_date": from_date_new,
  131. "create_at": create_at,
  132. "flight_numbers": flight_numbers,
  133. "baggage_weight": baggage_weight,
  134. }
  135. # try:
  136. # res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True)
  137. # except (ServerSelectionTimeoutError, PyMongoError) as e:
  138. # write_failed += 1
  139. # print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}")
  140. # if isinstance(e, ServerSelectionTimeoutError):
  141. # raise
  142. # continue
  143. # if res.upserted_id is not None:
  144. # upserted += 1
  145. # else:
  146. # matched += res.matched_count
  147. ops.append(pymongo.UpdateOne(dedup_filter, {"$set": new_doc}, upsert=True))
  148. if len(ops) >= batch_size:
  149. flush_ops(ops)
  150. ops = []
  151. if i % 500 == 0:
  152. print(f"写入进度 {city_pair} [{i}/{len(results)}]: upserted={upserted}, matched={matched}, failed={write_failed}")
  153. flush_ops(ops)
  154. print(f"写入集合: {mongo_table_uo}, upserted={upserted}, matched={matched}, failed={write_failed}, total={len(results)}")
  155. return
  156. except (ServerSelectionTimeoutError, PyMongoError) as e:
  157. print(f"⚠️ Mongo 处理失败(查询/写入): {e}")
  158. if attempt == max_retries:
  159. print("❌ 达到最大重试次数,放弃")
  160. return
  161. # 指数退避 + 随机抖动
  162. sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random()
  163. print(f"⏳ {sleep_time:.2f}s 后重试...")
  164. time.sleep(sleep_time)
  165. def mongo_con_parse(config=None):
  166. # if config is None:
  167. # config = mongo_atlas_config.copy()
  168. try:
  169. if config.get("URI", ""):
  170. motor_uri = config["URI"]
  171. client = pymongo.MongoClient(motor_uri, maxPoolSize=100)
  172. db = client[config['db']]
  173. print("motor_uri: ", motor_uri)
  174. else:
  175. client = pymongo.MongoClient(
  176. config['host'],
  177. config['port'],
  178. serverSelectionTimeoutMS=30000, # 30秒
  179. connectTimeoutMS=30000, # 30秒
  180. socketTimeoutMS=30000, # 30秒,
  181. retryReads=True, # 开启重试
  182. maxPoolSize=50
  183. )
  184. db = client[config['db']]
  185. if config.get('user'):
  186. db.authenticate(config['user'], config['pwd'])
  187. print(f"✅ MongoDB 连接对象创建成功")
  188. except Exception as e:
  189. print(f"❌ 创建 MongoDB 连接对象时发生错误: {e}")
  190. raise
  191. return client, db
  192. def main_import_process(create_at_begin, create_at_end):
  193. # 先转 datetime,再减去 0 小时得到 UTC 时间戳
  194. begin_dt = datetime.strptime(create_at_begin, "%Y-%m-%d %H:%M:%S") - timedelta(hours=0)
  195. end_dt = datetime.strptime(create_at_end, "%Y-%m-%d %H:%M:%S") - timedelta(hours=0)
  196. create_at_begin_stamp = int(begin_dt.timestamp())
  197. create_at_end_stamp = int(end_dt.timestamp())
  198. print(f"create_at_begin: {create_at_begin}, timestamp: {create_at_begin_stamp}")
  199. print(f"create_at_end: {create_at_end}, timestamp: {create_at_end_stamp}")
  200. uo_city_pairs = uo_city_pairs_new.copy()
  201. # 调试分支
  202. # uo_city_pairs = uo_city_pairs[5:6]
  203. for idx, city_pair in enumerate(uo_city_pairs):
  204. atlas_client, atlas_db = mongo_con_parse(atlas_config)
  205. mongo_client, mongo_db = mongo_con_parse(mongo_config)
  206. print(f"开始处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")
  207. import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp)
  208. print(f"结束处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")
  209. atlas_client.close()
  210. mongo_client.close()
  211. pass
  212. print("整体结束")
  213. if __name__ == "__main__":
  214. print(f"本次导入开始时间: {datetime.now()}")
  215. current_time = datetime.now()
  216. create_at_end = current_time.strftime("%Y-%m-%d %H:%M:%S")
  217. create_at_begin = (current_time - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
  218. # create_at_begin = "2026-04-21 00:00:00"
  219. # create_at_end = "2026-04-22 23:59:59"
  220. main_import_process(create_at_begin, create_at_end)
  221. print(f"本次导入结束时间: {datetime.now()}")
  222. print()