Browse Source

提交近期修改

node04 3 tuần trước cách đây
mục cha
commit
d41f3a197c
5 tập tin đã thay đổi với 35 bổ sung13 xóa
  1. 1 1
      data_process.py
  2. 28 6
      descending_cabin_task.py
  3. 2 2
      main_tr.py
  4. 3 3
      result_keep_verify.py
  5. 1 1
      uo_atlas_import.py

+ 1 - 1
data_process.py

@@ -467,7 +467,7 @@ def predict_data_simple(df_input, city_pair, object_dir, predict_dir=".", pred_t
         "cabins", "ticket_amount", "currency", 
         "price_total", 'relative_position', 'days_to_departure', 'hours_until_departure', 
         'price_change_amount', 'price_change_percent', 'price_duration_hours', 
-        "update_hour", "update_week", 
+        "update_hour", "create_time",
         'valid_begin_hour', 'valid_end_hour',
         'simple_will_price_drop', 'simple_drop_in_hours', 'simple_drop_in_hours_prob', 'simple_drop_in_hours_dist',
         'flag_dist',

+ 28 - 6
descending_cabin_task.py

@@ -70,6 +70,9 @@ def _process_one_task(row):
     drop_price_change_upper = float(task.get("drop_price_change_upper") or 0)   # 最小的降价幅度
     max_threshold = round(drop_price_change_upper * 1.0)
 
+    if abs(max_threshold) < 10:
+        return None
+
     end_segments = []
     for idx, flight_number in enumerate(flight_numbers):
         carrier = "".join([c for c in flight_number if c.isalpha()])
@@ -182,12 +185,31 @@ def main():
     if len(policy_list) > 0:
         # 这里批量一次性上传政策 
         payload = {"items": policy_list}
-        try:
-            sync_policy(payload)
-            logger.info(f"上传政策成功")
-        except Exception as e:
-            logger.error(f"上传政策失败: {e}")
-            logger.error(f"{traceback.format_exc()}")
+        max_attempts = 3
+        retryable_exceptions = (
+            requests.exceptions.ConnectionError,
+            requests.exceptions.ConnectTimeout,
+        )
+
+        for attempt in range(1, max_attempts + 1):
+            try:
+                sync_policy(payload)
+                logger.info(f"上传政策成功")
+                break
+            except retryable_exceptions as e:
+                if attempt == max_attempts:
+                    logger.error(f"上传政策失败(已重试{max_attempts}次): {e}")
+                    # logger.error(f"{traceback.format_exc()}")
+                else:
+                    wait_seconds = attempt
+                    logger.warning(
+                        f"上传政策连接异常,第{attempt}/{max_attempts}次失败: {e},{wait_seconds}s后重试"
+                    )
+                    time.sleep(wait_seconds)
+            except Exception as e:
+                logger.error(f"上传政策失败(非连接异常,不重试): {e}")
+                logger.error(f"{traceback.format_exc()}")
+                break
 
     logger.info(f"keep_info_end: {len(keep_info_end)}")
     

+ 2 - 2
main_tr.py

@@ -21,8 +21,8 @@ def start_train():
     max_workers = min(8, cpu_cores)  # 最大不超过8个进程
 
     from_date_end = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")  # 截止日改为昨天
-    # from_date_begin = "2026-03-17"  # 2026-03-17 2026-04-07 2026-04-09 2026-04-15
-    from_date_begin = "2026-04-09"
+    # from_date_begin = "2026-03-17"  # 2026-03-17 2026-04-15
+    from_date_begin = "2026-04-15"
 
     print(f"训练时间范围: {from_date_begin} 到 {from_date_end}")
 

+ 3 - 3
result_keep_verify.py

@@ -265,7 +265,7 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
             continue
 
         create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
-        create_time_end = (batch_dt + pd.Timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
+        create_time_end = (batch_dt + pd.Timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")
 
         df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, create_time_end)
         
@@ -324,6 +324,6 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
     return
 
 if __name__ == "__main__":
-    # verify_process("202604091700", "202604150800")
-    verify_process_2("202604091700", "202604150800")
+    verify_process("202604151100", "202604161400")
+    # verify_process_2("202604151100", "202604161400")
     pass

+ 1 - 1
uo_atlas_import.py

@@ -218,7 +218,7 @@ def main_import_process(create_at_begin, create_at_end):
     uo_city_pairs = uo_city_pairs_new.copy()
 
     # 调试分支
-    # uo_city_pairs = uo_city_pairs[47:48]
+    # uo_city_pairs = uo_city_pairs[5:6]
 
     for idx, city_pair in enumerate(uo_city_pairs):
         atlas_client, atlas_db = mongo_con_parse(atlas_config)