import os import random import time from datetime import datetime import pymongo from pymongo.errors import PyMongoError, ServerSelectionTimeoutError, BulkWriteError # import pandas as pd from config import atlas_config, mongo_config, atlas_table, mongo_table_uo, uo_city_pairs_old, uo_city_pairs_new def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp, limit=0, max_retries=3, base_sleep=1.0, out_table=None): """ 从atlas查询指定城市对、时间范围的航班状态数据,写入mongo集合。 :param atlas_db: atlas数据库连接 :param mongo_db: mongo数据库连接 :param city_pair: 城市对(例如:"BJSBKK") :param create_at_begin_stamp: 查询开始时间戳(秒级) :param create_at_end_stamp: 查询结束时间戳(秒级) :param limit: 限制返回结果数量(0表示不限制) :param max_retries: 最大重试次数 :param base_sleep: 基础重试间隔(秒) :param out_table: 目标mongo集合名称(默认:mongo_table_uo) """ for attempt in range(1, max_retries + 1): try: print(f"🔁 第 {attempt}/{max_retries} 次尝试查询") src_collection = atlas_db[atlas_table] # 源集合(atlas) out_collection = mongo_db[mongo_table_uo] # 目标集合(mongo) # 聚合查询管道 pipeline = [ { "$match": { "citypair": city_pair, "from_date": {"$ne": None}, "flight_weight": {"$regex": r"^UO.*(0;0|1;20)$"}, # 使用$regex进行正则匹配 "create_at": {"$gte": create_at_begin_stamp, "$lte": create_at_end_stamp} } }, { "$addFields": { "create_at_readable": { "$toDate": {"$multiply": ["$create_at", 1000]} } } }, { "$sort": {"from_date": 1, "create_at": 1} } ] if limit > 0: pipeline.append({"$limit": limit}) # 执行查询 t1 = time.time() results = list(src_collection.aggregate(pipeline)) t2 = time.time() rt = round(t2 - t1, 3) print(f"查询用时: {rt} 秒") # 写入结果(不落原表 _id/source_id,用业务字段做去重) upserted = 0 matched = 0 write_failed = 0 batch_size = 500 ops = [] def flush_ops(ops_to_flush): nonlocal upserted, matched, write_failed if not ops_to_flush: return try: bulk_res = out_collection.bulk_write(ops_to_flush, ordered=False) upserted += bulk_res.upserted_count matched += bulk_res.matched_count except BulkWriteError as e: details = e.details or {} write_failed += len(details.get("writeErrors", [])) upserted += details.get("nUpserted", 0) matched += details.get("nMatched", 0) except (ServerSelectionTimeoutError, PyMongoError) as e: write_failed += len(ops_to_flush) print(f"⚠️ Mongo 批量写入失败 {city_pair}: {e}") if isinstance(e, ServerSelectionTimeoutError): raise for i, doc in enumerate(results, start=1): citypair = doc.get('citypair') citypair_new = citypair[:3] + '-' + citypair[3:] from_date = doc.get('from_date') from_date_new = datetime.strptime(from_date, '%Y%m%d').strftime('%Y-%m-%d') flight_weight = doc.get('flight_weight') flight_weight_split = flight_weight.split(';') flight_numbers_raw = flight_weight_split[0] flight_numbers_parts = [x.strip() for x in flight_numbers_raw.split(',') if x.strip()] flight_numbers = ','.join(flight_numbers_parts) baggage_weight = int(flight_weight_split[-1]) deptime = doc.get('depTime', '') if deptime: from_time = datetime.strptime(deptime, '%Y%m%d%H%M').strftime('%Y-%m-%d %H:%M:%S') else: from_time = '' # 3月16之前抓的数据没有起飞时间 trip_type = doc.get('trip_type') cabin_raw = doc.get('cabin', '') cabin_parts = [x.strip() for x in cabin_raw.split(',') if x.strip()] cabins = ','.join(cabin_parts) ticket_amount = doc.get('ticket_amount') currency = doc.get('vendorCurrency', '') price_base = doc.get('price') price_tax = doc.get('tax') price_total = doc.get('total') create_at = doc.get('create_at') create_at_readable = doc.get('create_at_readable') create_time = create_at_readable.strftime('%Y-%m-%d %H:%M:%S') new_doc = { "citypair": citypair_new, "from_date": from_date_new, "flight_numbers": flight_numbers, "from_time": from_time, "trip_type": trip_type, "cabins": cabins, "baggage_weight": baggage_weight, "ticket_amount": ticket_amount, "currency": currency, "price_base": price_base, "price_tax": price_tax, "price_total": price_total, "create_at": create_at, "create_time": create_time, } dedup_filter = { "citypair": citypair_new, "from_date": from_date_new, "create_at": create_at, "flight_numbers": flight_numbers, "baggage_weight": baggage_weight, } # try: # res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True) # except (ServerSelectionTimeoutError, PyMongoError) as e: # write_failed += 1 # print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}") # if isinstance(e, ServerSelectionTimeoutError): # raise # continue # if res.upserted_id is not None: # upserted += 1 # else: # matched += res.matched_count ops.append(pymongo.UpdateOne(dedup_filter, {"$set": new_doc}, upsert=True)) if len(ops) >= batch_size: flush_ops(ops) ops = [] if i % 500 == 0: print(f"写入进度 {city_pair} [{i}/{len(results)}]: upserted={upserted}, matched={matched}, failed={write_failed}") flush_ops(ops) print(f"写入集合: {mongo_table_uo}, upserted={upserted}, matched={matched}, failed={write_failed}, total={len(results)}") return except (ServerSelectionTimeoutError, PyMongoError) as e: print(f"⚠️ Mongo 处理失败(查询/写入): {e}") if attempt == max_retries: print("❌ 达到最大重试次数,放弃") return # 指数退避 + 随机抖动 sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random() print(f"⏳ {sleep_time:.2f}s 后重试...") time.sleep(sleep_time) def mongo_con_parse(config=None): # if config is None: # config = mongo_atlas_config.copy() try: if config.get("URI", ""): motor_uri = config["URI"] client = pymongo.MongoClient(motor_uri, maxPoolSize=100) db = client[config['db']] print("motor_uri: ", motor_uri) else: client = pymongo.MongoClient( config['host'], config['port'], serverSelectionTimeoutMS=30000, # 30秒 connectTimeoutMS=30000, # 30秒 socketTimeoutMS=30000, # 30秒, retryReads=True, # 开启重试 maxPoolSize=50 ) db = client[config['db']] if config.get('user'): db.authenticate(config['user'], config['pwd']) print(f"✅ MongoDB 连接对象创建成功") except Exception as e: print(f"❌ 创建 MongoDB 连接对象时发生错误: {e}") raise return client, db def main_import_process(create_at_begin, create_at_end): create_at_begin_stamp = int(datetime.strptime(create_at_begin, "%Y-%m-%d %H:%M:%S").timestamp()) create_at_end_stamp = int(datetime.strptime(create_at_end, "%Y-%m-%d %H:%M:%S").timestamp()) print(f"create_at_begin: {create_at_begin}, timestamp: {create_at_begin_stamp}") print(f"create_at_end: {create_at_end}, timestamp: {create_at_end_stamp}") uo_city_pairs = uo_city_pairs_new.copy() for idx, city_pair in enumerate(uo_city_pairs): atlas_client, atlas_db = mongo_con_parse(atlas_config) mongo_client, mongo_db = mongo_con_parse(mongo_config) print(f"开始处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}") import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp) print(f"结束处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}") atlas_client.close() mongo_client.close() pass print("整体结束") print() if __name__ == "__main__": create_at_begin = "2026-03-27 10:00:00" create_at_end = "2026-03-27 15:59:59" main_import_process(create_at_begin, create_at_end) # try: # client, db = mongo_con_parse(mongo_atlas_config) # print(f"✅ 数据库连接创建成功") # except Exception as e: # print(f"❌ 数据库连接创建失败: {e}") # db = None