Explorar el Código

提交把uo数据从atlas导入到mongo

node04 hace 1 semana
padre
commit
53fb1165d7
Se han modificado 2 ficheros con 226 adiciones y 0 borrados
  1. 22 0
      config.py
  2. 204 0
      uo_atlas_import.py

+ 22 - 0
config.py

@@ -1,3 +1,25 @@
+# atlas 数据库
+atlas_config = {
+    "host": "192.168.20.218",
+    "port": 27017,
+    "db": "yssaas",
+    "user": "",
+    "pwd": ""
+}
+
+# mongo 数据库
+mongo_config = {
+    "host": "192.168.20.218",
+    "port": 27017,
+    "db": "flights_datas_db",
+    "user": "",
+    "pwd": ""
+}
+
+atlas_table = "atlas_history_data1"
+mongo_table_uo = "import_flights_uo_info_tab"
+
+
 # UO原始城市对
 uo_city_pairs = [
     "BJSBKK", "BJSCNX", "BJSDAD", "BJSHAN", "BJSHKG",

+ 204 - 0
uo_atlas_import.py

@@ -0,0 +1,204 @@
+import os
+import random
+import time
+from datetime import datetime
+import pymongo
+from pymongo.errors import PyMongoError, ServerSelectionTimeoutError
+# import pandas as pd
+from config import atlas_config, mongo_config, atlas_table, mongo_table_uo, uo_city_pairs
+
+
+def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp, 
+                              limit=0, max_retries=3, base_sleep=1.0, out_table=None):
+    """
+    从atlas查询指定城市对、时间范围的航班状态数据,写入mongo集合。
+    :param atlas_db: atlas数据库连接
+    :param mongo_db: mongo数据库连接
+    :param city_pair: 城市对(例如:"BJSBKK")
+    :param create_at_begin_stamp: 查询开始时间戳(秒级)
+    :param create_at_end_stamp: 查询结束时间戳(秒级)
+    :param limit: 限制返回结果数量(0表示不限制)
+    :param max_retries: 最大重试次数
+    :param base_sleep: 基础重试间隔(秒)
+    :param out_table: 目标mongo集合名称(默认:mongo_table_uo)
+    """
+    for attempt in range(1, max_retries + 1):
+        try:
+            print(f"🔁 第 {attempt}/{max_retries} 次尝试查询")
+            src_collection = atlas_db[atlas_table]      # 源集合(atlas)
+            out_collection = mongo_db[mongo_table_uo]   # 目标集合(mongo)
+            # 聚合查询管道
+            pipeline = [
+                {
+                    "$match": {
+                        "citypair": city_pair,
+                        "from_date": {"$ne": None},
+                        "flight_weight": {"$regex": r"^UO.*(0;0|1;20)$"},  # 使用$regex进行正则匹配
+                        "create_at": {"$gte": create_at_begin_stamp, "$lte": create_at_end_stamp}
+                    }
+                },
+                {
+                    "$addFields": {
+                        "create_at_readable": {
+                            "$toDate": {"$multiply": ["$create_at", 1000]}
+                        }
+                    }
+                },
+                {
+                    "$sort": {"from_date": 1, "create_at": 1}
+                }
+            ]
+            # 执行查询
+            t1 = time.time()
+            results = list(src_collection.aggregate(pipeline))
+            t2 = time.time()
+            rt = round(t2 - t1, 3)
+            print(f"查询用时: {rt} 秒")
+
+            # 写入结果(不落原表 _id/source_id,用业务字段做去重)
+            upserted = 0
+            matched = 0
+            write_failed = 0
+            for i, doc in enumerate(results, start=1):
+                citypair = doc.get('citypair')
+                citypair_new = citypair[:3] + '-' + citypair[3:]
+                from_date = doc.get('from_date')
+                from_date_new = datetime.strptime(from_date, '%Y%m%d').strftime('%Y-%m-%d')
+                flight_weight = doc.get('flight_weight')
+                flight_weight_split = flight_weight.split(';')
+                flight_numbers_raw = flight_weight_split[0]
+                flight_numbers_parts = [x.strip() for x in flight_numbers_raw.split(',') if x.strip()]
+                flight_numbers = ','.join(flight_numbers_parts)
+                baggage_weight = int(flight_weight_split[-1])
+                deptime = doc.get('depTime', '')
+                if deptime:
+                    from_time = datetime.strptime(deptime, '%Y%m%d%H%M').strftime('%Y-%m-%d %H:%M:%S')
+                else:
+                    from_time = ''   # 3月16之前抓的数据没有起飞时间
+                trip_type = doc.get('trip_type')
+                cabin_raw = doc.get('cabin', '')
+                cabin_parts = [x.strip() for x in cabin_raw.split(',') if x.strip()]
+                cabins = ','.join(cabin_parts)
+                ticket_amount = doc.get('ticket_amount')
+                currency = doc.get('vendorCurrency', '')
+                price_base = doc.get('price')
+                price_tax = doc.get('tax')
+                price_total = doc.get('total')
+                create_at = doc.get('create_at')
+                create_at_readable = doc.get('create_at_readable')
+                create_time = create_at_readable.strftime('%Y-%m-%d %H:%M:%S')
+
+                new_doc = {
+                    "citypair": citypair_new,
+                    "from_date": from_date_new,
+                    "flight_numbers": flight_numbers,
+                    "from_time": from_time,
+                    "trip_type": trip_type,
+                    "cabins": cabins,
+                    "baggage_weight": baggage_weight,
+                    "ticket_amount": ticket_amount,
+                    "currency": currency,
+                    "price_base": price_base,
+                    "price_tax": price_tax,
+                    "price_total": price_total,
+                    "create_at": create_at,
+                    "create_time": create_time,
+                }
+
+                dedup_filter = {
+                    "citypair": citypair_new,
+                    "from_date": from_date_new,
+                    "create_at": create_at,
+                    "flight_numbers": flight_numbers,
+                    "baggage_weight": baggage_weight,
+                }
+
+                try:
+                    res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True)
+                except (ServerSelectionTimeoutError, PyMongoError) as e:
+                    write_failed += 1
+                    print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}")
+                    if isinstance(e, ServerSelectionTimeoutError):
+                        raise
+                    continue
+                
+                if res.upserted_id is not None:
+                    upserted += 1
+                else:
+                    matched += res.matched_count
+                
+            print(f"写入集合: {mongo_table_uo}, upserted={upserted}, matched={matched}, failed={write_failed}, total={len(results)}")
+            return
+
+        except (ServerSelectionTimeoutError, PyMongoError) as e:
+            print(f"⚠️ Mongo 处理失败(查询/写入): {e}")
+            if attempt == max_retries:
+                print("❌ 达到最大重试次数,放弃")
+                return
+            
+            # 指数退避 + 随机抖动
+            sleep_time = base_sleep * (2 ** (attempt - 1)) + random.random()
+            print(f"⏳ {sleep_time:.2f}s 后重试...")
+            time.sleep(sleep_time)
+
+
+def mongo_con_parse(config=None):
+    # if config is None:
+    #     config = mongo_atlas_config.copy()
+    try:
+        if config.get("URI", ""):
+            motor_uri = config["URI"]
+            client = pymongo.MongoClient(motor_uri, maxPoolSize=100)
+            db = client[config['db']]
+            print("motor_uri: ", motor_uri)
+        else:
+            client = pymongo.MongoClient(
+                config['host'],
+                config['port'],
+                serverSelectionTimeoutMS=30000,  # 30秒
+                connectTimeoutMS=30000,  # 30秒
+                socketTimeoutMS=30000,  # 30秒,
+                retryReads=True,    # 开启重试
+                maxPoolSize=50
+            )
+            db = client[config['db']]
+            if config.get('user'):
+                db.authenticate(config['user'], config['pwd'])
+
+        print(f"✅ MongoDB 连接对象创建成功")
+
+    except Exception as e:
+        print(f"❌ 创建 MongoDB 连接对象时发生错误: {e}")
+        raise
+    return client, db
+
+def main_import_process(create_at_begin, create_at_end):
+    create_at_begin_stamp = int(datetime.strptime(create_at_begin, "%Y-%m-%d %H:%M:%S").timestamp())
+    create_at_end_stamp = int(datetime.strptime(create_at_end, "%Y-%m-%d %H:%M:%S").timestamp())
+    print(f"create_at_begin: {create_at_begin}, timestamp: {create_at_begin_stamp}")
+    print(f"create_at_end: {create_at_end}, timestamp: {create_at_end_stamp}")
+
+    atlas_client, atlas_db = mongo_con_parse(atlas_config)
+    mongo_client, mongo_db = mongo_con_parse(mongo_config)
+    for idx, city_pair in enumerate(uo_city_pairs):
+        print(f"开始处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")    
+        import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp)
+        print(f"结束处理航线 {idx+1}/{len(uo_city_pairs)}: {city_pair}")
+    atlas_client.close()
+    mongo_client.close()
+    pass
+    print("整体结束")
+    print()
+
+if __name__ == "__main__":
+    create_at_begin = "2026-02-18 00:00:00"
+    create_at_end = "2026-02-28 23:59:59"
+    main_import_process(create_at_begin, create_at_end)
+    
+    # try:
+    #     client, db = mongo_con_parse(mongo_atlas_config)
+    #     print(f"✅ 数据库连接创建成功")
+    # except Exception as e:
+    #     print(f"❌ 数据库连接创建失败: {e}")
+    #     db = None
+