| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- import os
- import random
- import time
- from datetime import datetime
- import pymongo
- from pymongo.errors import PyMongoError, ServerSelectionTimeoutError, BulkWriteError
- # import pandas as pd
- from config import atlas_config, mongo_config, atlas_table, mongo_table_uo, uo_city_pairs_old, uo_city_pairs_new
- def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp,
- limit=0, max_retries=3, base_sleep=1.0, out_table=None):
- """
- 从atlas查询指定城市对、时间范围的航班状态数据,写入mongo集合。
- :param atlas_db: atlas数据库连接
- :param mongo_db: mongo数据库连接
- :param city_pair: 城市对(例如:"BJSBKK")
- :param create_at_begin_stamp: 查询开始时间戳(秒级)
- :param create_at_end_stamp: 查询结束时间戳(秒级)
- :param limit: 限制返回结果数量(0表示不限制)
- :param max_retries: 最大重试次数
- :param base_sleep: 基础重试间隔(秒)
- :param out_table: 目标mongo集合名称(默认:mongo_table_uo)
- """
- for attempt in range(1, max_retries + 1):
- try:
- print(f"🔁 第 {attempt}/{max_retries} 次尝试查询")
- src_collection = atlas_db[atlas_table] # 源集合(atlas)
- out_collection = mongo_db[mongo_table_uo] # 目标集合(mongo)
- # 聚合查询管道
- pipeline = [
- {
- "$match": {
- "citypair": city_pair,
- "from_date": {"$ne": None},
- "flight_weight": {"$regex": r"^UO.*(0;0|1;20)$"}, # 使用$regex进行正则匹配
- "create_at": {"$gte": create_at_begin_stamp, "$lte": create_at_end_stamp}
- }
- },
- {
- "$addFields": {
- "create_at_readable": {
- "$toDate": {"$multiply": ["$create_at", 1000]}
- }
- }
- },
- {
- "$sort": {"from_date": 1, "create_at": 1}
- }
- ]
-
- if limit > 0:
- pipeline.append({"$limit": limit})
- # 执行查询
- t1 = time.time()
- results = list(src_collection.aggregate(pipeline))
- t2 = time.time()
- rt = round(t2 - t1, 3)
- print(f"查询用时: {rt} 秒")
- # 写入结果(不落原表 _id/source_id,用业务字段做去重)
- upserted = 0
- matched = 0
- write_failed = 0
- batch_size = 500
- ops = []
- def flush_ops(ops_to_flush):
- nonlocal upserted, matched, write_failed
- if not ops_to_flush:
- return
- try:
- bulk_res = out_collection.bulk_write(ops_to_flush, ordered=False)
- upserted += bulk_res.upserted_count
- matched += bulk_res.matched_count
- except BulkWriteError as e:
- details = e.details or {}
- write_failed += len(details.get("writeErrors", []))
- upserted += details.get("nUpserted", 0)
- matched += details.get("nMatched", 0)
- except (ServerSelectionTimeoutError, PyMongoError) as e:
- write_failed += len(ops_to_flush)
- print(f"⚠️ Mongo 批量写入失败 {city_pair}: {e}")
- if isinstance(e, ServerSelectionTimeoutError):
- raise
- for i, doc in enumerate(results, start=1):
- citypair = doc.get('citypair')
- citypair_new = citypair[:3] + '-' + citypair[3:]
- from_date = doc.get('from_date')
- from_date_new = datetime.strptime(from_date, '%Y%m%d').strftime('%Y-%m-%d')
- flight_weight = doc.get('flight_weight')
- flight_weight_split = flight_weight.split(';')
- flight_numbers_raw = flight_weight_split[0]
- flight_numbers_parts = [x.strip() for x in flight_numbers_raw.split(',') if x.strip()]
- flight_numbers = ','.join(flight_numbers_parts)
- baggage_weight = int(flight_weight_split[-1])
- deptime = doc.get('depTime', '')
- if deptime:
- from_time = datetime.strptime(deptime, '%Y%m%d%H%M').strftime('%Y-%m-%d %H:%M:%S')
- else:
- from_time = '' # 3月16之前抓的数据没有起飞时间
- trip_type = doc.get('trip_type')
- cabin_raw = doc.get('cabin', '')
- cabin_parts = [x.strip() for x in cabin_raw.split(',') if x.strip()]
- cabins = ','.join(cabin_parts)
- ticket_amount = doc.get('ticket_amount')
- currency = doc.get('vendorCurrency', '')
- price_base = doc.get('price')
- price_tax = doc.get('tax')
- price_total = doc.get('total')
- create_at = doc.get('create_at')
- create_at_readable = doc.get('create_at_readable')
- create_time = create_at_readable.strftime('%Y-%m-%d %H:%M:%S')
- new_doc = {
- "citypair": citypair_new,
- "from_date": from_date_new,
- "flight_numbers": flight_numbers,
- "from_time": from_time,
- "trip_type": trip_type,
- "cabins": cabins,
- "baggage_weight": baggage_weight,
- "ticket_amount": ticket_amount,
- "currency": currency,
- "price_base": price_base,
- "price_tax": price_tax,
- "price_total": price_total,
- "create_at": create_at,
- "create_time": create_time,
- }
- dedup_filter = {
- "citypair": citypair_new,
- "from_date": from_date_new,
- "create_at": create_at,
- "flight_numbers": flight_numbers,
- "baggage_weight": baggage_weight,
- }
- # try:
- # res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True)
- # except (ServerSelectionTimeoutError, PyMongoError) as e:
- # write_failed += 1
- # print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}")
- # if isinstance(e, ServerSelectionTimeoutError):
- # raise
- # continue
-
- # if res.upserted_id is not None:
- # upserted += 1
- # else:
- # matched += res.matched_count
- ops.append(pymongo.UpdateOne(dedup_filter, {"$set": new_doc}, upsert=True))
- if len(ops) >= batch_size:
- flush_ops(ops)
- ops = []
-
- if i % 500 == 0:
- print(f"写入进度 {city_pair} [{i}/{len(results)}]: upserted={upserted}, matched={matched}, failed={write_failed}")
-
- flush_ops(ops)
- print(f"写入集合: {mongo_table_uo}, upserted={upserted}, matched={matched}, failed={write_failed}, total={len(results)}")
- return
- except (ServerSelectionTimeoutError, PyMongoError) as e:
- print(f"⚠️ Mongo 处理失败(查询/写入): {e}")
- if attempt == max_retries:
- print("❌ 达到最大重试次数,放弃")
- return
-
- # 指数退避 + 随机抖动
- sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random()
- print(f"⏳ {sleep_time:.2f}s 后重试...")
- time.sleep(sleep_time)
- def mongo_con_parse(config=None):
- # if config is None:
- # config = mongo_atlas_config.copy()
- try:
- if config.get("URI", ""):
- motor_uri = config["URI"]
- client = pymongo.MongoClient(motor_uri, maxPoolSize=100)
- db = client[config['db']]
- print("motor_uri: ", motor_uri)
- else:
- client = pymongo.MongoClient(
- config['host'],
- config['port'],
- serverSelectionTimeoutMS=30000, # 30秒
- connectTimeoutMS=30000, # 30秒
- socketTimeoutMS=30000, # 30秒,
- retryReads=True, # 开启重试
- maxPoolSize=50
- )
- db = client[config['db']]
- if config.get('user'):
- db.authenticate(config['user'], config['pwd'])
- print(f"✅ MongoDB 连接对象创建成功")
- except Exception as e:
- print(f"❌ 创建 MongoDB 连接对象时发生错误: {e}")
- raise
- return client, db
- def main_import_process(create_at_begin, create_at_end):
- create_at_begin_stamp = int(datetime.strptime(create_at_begin, "%Y-%m-%d %H:%M:%S").timestamp())
- create_at_end_stamp = int(datetime.strptime(create_at_end, "%Y-%m-%d %H:%M:%S").timestamp())
- print(f"create_at_begin: {create_at_begin}, timestamp: {create_at_begin_stamp}")
- print(f"create_at_end: {create_at_end}, timestamp: {create_at_end_stamp}")
- uo_city_pairs = uo_city_pairs_new.copy()
- for idx, city_pair in enumerate(uo_city_pairs):
- atlas_client, atlas_db = mongo_con_parse(atlas_config)
- mongo_client, mongo_db = mongo_con_parse(mongo_config)
- print(f"开始处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")
- import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp)
- print(f"结束处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")
- atlas_client.close()
- mongo_client.close()
- pass
- print("整体结束")
- print()
- if __name__ == "__main__":
- create_at_begin = "2026-03-26 00:00:00"
- create_at_end = "2026-03-26 15:59:59"
- main_import_process(create_at_begin, create_at_end)
-
- # try:
- # client, db = mongo_con_parse(mongo_atlas_config)
- # print(f"✅ 数据库连接创建成功")
- # except Exception as e:
- # print(f"❌ 数据库连接创建失败: {e}")
- # db = None
-
|