Forráskód Böngészése

提交统一执行的sh脚本, 修改bug

node04 1 hete
szülő
commit
659e21f62d
3 módosított fájl, 67 hozzáadás és 14 törlés
  1. 18 11
      descending_cabin_task.py
  2. 3 3
      follow_up.py
  3. 46 0
      run_vj.sh

+ 18 - 11
descending_cabin_task.py

@@ -256,8 +256,8 @@ class VerifyResultHandler:
 class FlightPriceTaskRunner:
     """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
 
-    def __init__(self, client: FlightPriceClient | None = None):
-        self.logger = XmiLogger("flight_price_task")
+    def __init__(self, client: FlightPriceClient | None = None, logger: XmiLogger | None = None):
+        self.logger = logger or XmiLogger("flight_price_task")
         self.client = client or FlightPriceClient()
         self.matcher = ResultMatcher()
         self.handler = VerifyResultHandler()
@@ -608,7 +608,7 @@ def main():
     # 注意  \ufeff 是 UTF-8 的 BOM
     # 所以需要使用 utf-8-sig 编码
     task_list = []
-    runner = FlightPriceTaskRunner()
+    runner = FlightPriceTaskRunner(logger=logger)
 
     # 1 读取任务列表
     output_dir = "./keep_0"
@@ -658,14 +658,21 @@ def main():
 
     logger.info(f"keep_info_end: {len(keep_info_end)}")
     # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理  提供下载页面 (历史数据需要保留)
-    # if not os.path.exists("/home/node04/descending_cabin_files"):
-    #     os.makedirs("/home/node04/descending_cabin_files")
-    with open(f"/home/node04/descending_cabin_files/keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", "w", encoding="utf-8-sig") as f:
-        writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
-        writer.writeheader()
-        for task in keep_info_end:
-            writer.writerow(task)
-    logger.info(f"keep_info_end 写入完成")
+    output_dir = "/home/node04/descending_cabin_files"
+    os.makedirs(output_dir, exist_ok=True)
+    if keep_info_end:
+        out_path = os.path.join(
+            output_dir,
+            f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
+        )
+        with open(out_path, "w", encoding="utf-8-sig") as f:
+            writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
+            writer.writeheader()
+            for task in keep_info_end:
+                writer.writerow(task)
+        logger.info("keep_info_end 写入完成")
+    else:
+        logger.warning("keep_info_end 为空,跳过写入CSV")
 
 
 if __name__ == "__main__":

+ 3 - 3
follow_up.py

@@ -128,7 +128,7 @@ def follow_up_handle():
         # 将长时间没更新的航班标记为-1
         dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce")
         dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce")
-        mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=12)
+        mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=8)
         if mask_abnormal_time.any():
             df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1
         
@@ -309,7 +309,7 @@ def follow_up_handle():
         # 将长时间没更新的航班标记为-1
         dt_update_hour = pd.to_datetime(df_keep_info["update_hour"], errors="coerce")
         dt_crawl_date = pd.to_datetime(df_keep_info["crawl_date"], errors="coerce")
-        mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=12)
+        mask_abnormal_time = (dt_update_hour - dt_crawl_date) > pd.Timedelta(hours=8)
         if mask_abnormal_time.any():
             df_keep_info.loc[mask_abnormal_time.fillna(False), "keep_flag"] = -1
         
@@ -348,7 +348,7 @@ def follow_up_handle():
     pass
 
 if __name__ == "__main__":
-    time.sleep(2)
+    time.sleep(5)
     follow_up_handle()
     time.sleep(5)
     from descending_cabin_task import main as descending_cabin_task_main

+ 46 - 0
run_vj.sh

@@ -0,0 +1,46 @@
+#!/bin/bash
+
+cd /home/node04/yuzhou/jiangcang_vj
+
+LOG_DIR=logs
+mkdir -p $LOG_DIR
+
+log() {
+    echo "$(date '+%Y-%m-%d %H:%M:%S') $1" >> $LOG_DIR/control.log
+}
+
+log "=== 脚本开始执行 ==="
+
+START_TIME=$(date +%s)
+
+# 启动第一个任务(后台执行)
+/home/node04/anaconda3/bin/python main_pe_0.py >> $LOG_DIR/prediction.log 2>&1 &
+PID=$!
+
+log "main_pe_0.py 已启动,PID=$PID"
+
+# 最大等待时间(22分钟 = 1320秒)
+TIMEOUT=1320
+
+while kill -0 $PID 2>/dev/null; do
+    NOW=$(date +%s)
+    ELAPSED=$((NOW - START_TIME))
+
+    if [ $ELAPSED -ge $TIMEOUT ]; then
+        log "main_pe_0.py 超时(已运行 ${ELAPSED} 秒),开始执行 follow_up.py"
+        break
+    fi
+
+    sleep 5
+done
+
+# 如果是正常结束
+if ! kill -0 $PID 2>/dev/null; then
+    log "main_pe_0.py 正常结束(耗时 ${ELAPSED} 秒)"
+fi
+
+log "开始执行 follow_up.py"
+
+/home/node04/anaconda3/bin/python follow_up.py >> $LOG_DIR/keep.log 2>&1
+
+log "=== 脚本执行结束 ==="