Răsfoiți Sursa

提交近期修改

node04 1 lună în urmă
părinte
comite
82a45c792b
4 a modificat fișierele cu 137 adăugiri și 5 ștergeri
  1. 8 2
      descending_cabin_task.py
  2. 2 1
      main_tr.py
  3. 125 0
      result_keep_verify.py
  4. 2 2
      uo_atlas_import.py

+ 8 - 2
descending_cabin_task.py

@@ -6,6 +6,7 @@ import json
 import time
 import requests
 import threading
+import traceback
 from datetime import datetime
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from xmi_logger import XmiLogger
@@ -66,7 +67,7 @@ def _process_one_task(row):
     )
 
     drop_price_change_upper = float(task.get("drop_price_change_upper") or 0)   # 最小的降价幅度
-    max_threshold = round(drop_price_change_upper * 0.5)
+    max_threshold = round(drop_price_change_upper * 0.8)
 
     end_segments = []
     for idx, flight_number in enumerate(flight_numbers):
@@ -166,7 +167,12 @@ def main():
     if len(policy_list) > 0:
         # 这里批量一次性上传政策 
         payload = {"items": policy_list}
-        sync_policy(payload)
+        try:
+            sync_policy(payload)
+            logger.info(f"上传政策成功")
+        except Exception as e:
+            logger.error(f"上传政策失败: {e}")
+            logger.error(f"{traceback.format_exc()}")
 
     logger.info(f"keep_info_end: {len(keep_info_end)}")
     # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理  提供下载页面 (历史数据需要保留)

+ 2 - 1
main_tr.py

@@ -21,7 +21,8 @@ def start_train():
     max_workers = min(8, cpu_cores)  # 最大不超过8个进程
 
     from_date_end = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")  # 截止日改为昨天
-    from_date_begin = "2026-03-17"  # 2026-03-17 2026-03-30
+    # from_date_begin = "2026-03-17"  # 2026-03-17 2026-04-07 2026-04-09
+    from_date_begin = "2026-04-07"
 
     print(f"训练时间范围: {from_date_begin} 到 {from_date_end}")
 

+ 125 - 0
result_keep_verify.py

@@ -152,6 +152,131 @@ def verify_process(min_batch_time_str, max_batch_time_str):
     print()
 
 
+def verify_process_2(min_batch_time_str, max_batch_time_str):
+    
+    object_dir = "/home/node04/descending_cabin_files_uo"
+
+    output_dir = f"./validate/keep"
+    os.makedirs(output_dir, exist_ok=True)
+
+    timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+    save_scv = f"result_keep_verify_{timestamp_str}.csv"
+    output_path = os.path.join(output_dir, save_scv)
+
+    # 检查目录是否存在
+    if not os.path.exists(object_dir):
+        print(f"目录不存在: {object_dir}")
+        return
+    
+    # 获取所有以 keep_info_end_ 开头的 CSV 文件
+    csv_files = []
+    for file in os.listdir(object_dir):
+        if file.startswith("keep_info_end_") and file.endswith(".csv"):
+            csv_files.append(file)
+    
+    if not csv_files:
+        print(f"在 {object_dir} 中没有找到 keep_info_end_ 开头的 CSV 文件")
+        return
+
+    csv_files.sort()
+
+    min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
+    min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
+    max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
+    max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
+
+    if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
+        print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
+        return
+    
+    list_df = []
+
+    # 从所有的 keep_info_end_ 文件中
+    for csv_file in csv_files:
+        batch_time_str = csv_file.replace("keep_info_end_", "").replace(".csv", "")
+        batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M%S")
+        batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
+
+        if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
+            continue
+        if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
+            continue
+        
+        # 读取 CSV 文件
+        csv_path = os.path.join(object_dir, csv_file)
+        try:
+            df_keep_info = pd.read_csv(csv_path)
+        except Exception as e:
+            print(f"read {csv_path} error: {str(e)}")
+            continue
+
+        if df_keep_info.empty:
+            print(f"keep_info数据为空: {csv_file}")
+            continue
+        
+        df_keep_info["batch_time_str"] = batch_hour_dt.strftime("%Y%m%d%H%M")
+        # df_keep_info["src_file"] = csv_file
+        list_df.append(df_keep_info)
+        del df_keep_info
+
+    if not list_df:
+        print("时间范围内没有可用 keep_info_end_ 数据")
+        return
+
+    df_keep_all = pd.concat(list_df, ignore_index=True)
+    del list_df
+
+    sort_cols = ["citypair", "flight_numbers", "baggage_weight", "from_date", "into_update_hour"]
+    df_keep_all = df_keep_all.sort_values(sort_cols, kind="mergesort").reset_index(drop=True)
+    df_keep_all["gid"] = df_keep_all.groupby(sort_cols, sort=False).ngroup().astype("int64") + 1
+
+    client, db = mongo_con_parse(mongo_config)
+    list_base_row = []
+
+    for gid, df_gid in df_keep_all.groupby("gid", sort=False):
+        city_pair = df_gid["citypair"].iloc[0]
+        flight_numbers = df_gid["flight_numbers"].iloc[0]
+        baggage_weight = df_gid["baggage_weight"].iloc[0]
+        from_date = df_gid["from_date"].iloc[0]
+        into_update_hour = df_gid["into_update_hour"].iloc[0]
+        valid_end_hour = df_gid["valid_end_hour"].iloc[0]
+
+        into_update_dt = pd.to_datetime(
+            df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
+        ).min()
+        batch_dt_series = pd.to_datetime(
+            df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
+        )
+        batch_dt = batch_dt_series.max()
+
+        entry_price = float("nan")
+        if batch_dt_series.notna().any():
+            idx_latest = batch_dt_series.idxmax()
+            entry_price = pd.to_numeric(df_gid.loc[idx_latest].get("price_total"), errors="coerce")
+
+        valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")
+
+        flag = 0   # 等待(弹出)标记
+        if batch_dt >= valid_end_dt:
+            flag = 2     # 超时标记      
+
+        if pd.isna(into_update_dt) or pd.isna(batch_dt):
+            print(f"gid={gid} 时间字段解析失败,跳过")
+            continue
+
+        create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
+        create_time_end = (batch_dt + pd.Timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
+
+        df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, create_time_end)
+        
+        df_g1 = df_gid.copy()
+        df_g2 = df_query.copy()
+        
+        
+        pass
+
+    pass
+
 if __name__ == "__main__":
     verify_process("202604071700", "202604081400")
     pass

+ 2 - 2
uo_atlas_import.py

@@ -241,8 +241,8 @@ if __name__ == "__main__":
     create_at_end = current_time.strftime("%Y-%m-%d %H:%M:%S")
     create_at_begin = (current_time - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
 
-    # create_at_begin = "2026-04-07 00:00:00"
-    # create_at_end = "2026-04-07 10:59:59"
+    # create_at_begin = "2026-04-08 00:00:00"
+    # create_at_end = "2026-04-09 15:59:59"
 
     main_import_process(create_at_begin, create_at_end)