node04 3 дней назад
Родитель
Сommit
31035106f3
2 измененных файлов с 105 добавлено и 19 удалено
  1. 53 1
      .gitignore
  2. 52 18
      uo_atlas_import.py

+ 53 - 1
.gitignore

@@ -1 +1,53 @@
-photo/
+# ===== Python =====
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+
+# Distribution / packaging
+dist/
+build/
+*.egg-info/
+*.egg
+
+# Virtual environments
+.venv/
+venv/
+env/
+
+# ===== IDE / Editor =====
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+.project
+.settings/
+
+# ===== OS =====
+.DS_Store
+Thumbs.db
+desktop.ini
+
+# ===== Project-specific =====
+# 生成的图表图片
+photo/
+
+# 字体文件(体积大,不适合版本控制)
+*.ttf
+*.otf
+
+# 日志文件
+*.log
+logs/
+
+# 临时文件
+*.tmp
+*.bak
+
+# 数据文件(按需取消注释)
+# *.csv
+# *.xlsx
+# *.json
+# *.parquet
+# data/

+ 52 - 18
uo_atlas_import.py

@@ -3,13 +3,13 @@ import random
 import time
 from datetime import datetime
 import pymongo
-from pymongo.errors import PyMongoError, ServerSelectionTimeoutError
+from pymongo.errors import PyMongoError, ServerSelectionTimeoutError, BulkWriteError
 # import pandas as pd
 from config import atlas_config, mongo_config, atlas_table, mongo_table_uo, uo_city_pairs
 
 
 def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_stamp, create_at_end_stamp, 
-                              limit=0, max_retries=3, base_sleep=1.0, out_table=None):
+                               limit=0, max_retries=3, base_sleep=1.0, out_table=None):
     """
     从atlas查询指定城市对、时间范围的航班状态数据,写入mongo集合。
     :param atlas_db: atlas数据库连接
@@ -48,6 +48,10 @@ def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_st
                     "$sort": {"from_date": 1, "create_at": 1}
                 }
             ]
+            
+            if limit > 0:
+                pipeline.append({"$limit": limit})
+
             # 执行查询
             t1 = time.time()
             results = list(src_collection.aggregate(pipeline))
@@ -59,6 +63,28 @@ def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_st
             upserted = 0
             matched = 0
             write_failed = 0
+            batch_size = 500
+            ops = []
+
+            def flush_ops(ops_to_flush):
+                nonlocal upserted, matched, write_failed
+                if not ops_to_flush:
+                    return
+                try:
+                    bulk_res = out_collection.bulk_write(ops_to_flush, ordered=False)
+                    upserted += bulk_res.upserted_count
+                    matched += bulk_res.matched_count
+                except BulkWriteError as e:
+                    details = e.details or {}
+                    write_failed += len(details.get("writeErrors", []))
+                    upserted += details.get("nUpserted", 0)
+                    matched += details.get("nMatched", 0)
+                except (ServerSelectionTimeoutError, PyMongoError) as e:
+                    write_failed += len(ops_to_flush)
+                    print(f"⚠️ Mongo 批量写入失败 {city_pair}: {e}")
+                    if isinstance(e, ServerSelectionTimeoutError):
+                        raise
+
             for i, doc in enumerate(results, start=1):
                 citypair = doc.get('citypair')
                 citypair_new = citypair[:3] + '-' + citypair[3:]
@@ -113,23 +139,31 @@ def import_flight_range_status(atlas_db, mongo_db, city_pair, create_at_begin_st
                     "baggage_weight": baggage_weight,
                 }
 
-                try:
-                    res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True)
-                except (ServerSelectionTimeoutError, PyMongoError) as e:
-                    write_failed += 1
-                    print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}")
-                    if isinstance(e, ServerSelectionTimeoutError):
-                        raise
-                    continue
+                # try:
+                #     res = out_collection.update_one(dedup_filter, {"$set": new_doc}, upsert=True)
+                # except (ServerSelectionTimeoutError, PyMongoError) as e:
+                #     write_failed += 1
+                #     print(f"⚠️ Mongo 写入失败 {city_pair} [{i}/{len(results)}]: {e}")
+                #     if isinstance(e, ServerSelectionTimeoutError):
+                #         raise
+                #     continue
                 
-                if res.upserted_id is not None:
-                    upserted += 1
-                else:
-                    matched += res.matched_count
+                # if res.upserted_id is not None:
+                #     upserted += 1
+                # else:
+                #     matched += res.matched_count
 
-                if i % 100 == 0:
+                ops.append(pymongo.UpdateOne(dedup_filter, {"$set": new_doc}, upsert=True))
+
+                if len(ops) >= batch_size:
+                    flush_ops(ops)
+                    ops = []
+        
+                if i % 500 == 0:
                     print(f"写入进度 {city_pair} [{i}/{len(results)}]: upserted={upserted}, matched={matched}, failed={write_failed}")
-                
+            
+            flush_ops(ops)
+
             print(f"写入集合: {mongo_table_uo}, upserted={upserted}, matched={matched}, failed={write_failed}, total={len(results)}")
             return
 
@@ -194,8 +228,8 @@ def main_import_process(create_at_begin, create_at_end):
     print()
 
 if __name__ == "__main__":
-    create_at_begin = "2026-03-11 00:00:00"
-    create_at_end = "2026-03-20 23:59:59"
+    create_at_begin = "2026-03-21 00:00:00"
+    create_at_end = "2026-03-24 23:59:59"
     main_import_process(create_at_begin, create_at_end)
     
     # try: