Kaynağa Gözat

修改阈值与保存时间到redis

node04 4 hafta önce
ebeveyn
işleme
c3d38ccf9e
1 değiştirilmiş dosya ile 24 ekleme ve 2 silme
  1. 24 2
      descending_cabin_task.py

+ 24 - 2
descending_cabin_task.py

@@ -7,7 +7,8 @@ import time
 import requests
 import threading
 import traceback
-from datetime import datetime
+import redis
+from datetime import datetime, timedelta
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from xmi_logger import XmiLogger
 import csv
@@ -67,7 +68,7 @@ def _process_one_task(row):
     )
 
     drop_price_change_upper = float(task.get("drop_price_change_upper") or 0)   # 最小的降价幅度
-    max_threshold = round(drop_price_change_upper * 0.8)
+    max_threshold = round(drop_price_change_upper * 1.0)
 
     end_segments = []
     for idx, flight_number in enumerate(flight_numbers):
@@ -114,6 +115,20 @@ def sync_policy(payload):
     return resp_json
 
 
+def time_handle():
+    now_time = datetime.now()
+    next_time = now_time + timedelta(hours=1)
+    next_ts = int(next_time.timestamp())
+
+    expire_at = next_time + timedelta(minutes=1)
+    ttl_seconds = int((expire_at - now_time).total_seconds())
+    if ttl_seconds <= 0:
+        ttl_seconds = 1
+
+    redis_client = redis.Redis(host='192.168.20.98', port=6379, db=0)
+    lock_key = "uo_next_pred_time"
+    redis_client.set(lock_key, next_ts, ex=ttl_seconds)
+
 
 def main():
 
@@ -175,6 +190,13 @@ def main():
             logger.error(f"{traceback.format_exc()}")
 
     logger.info(f"keep_info_end: {len(keep_info_end)}")
+    
+    try:
+        time_handle()
+        logger.info(f"存redis时间成功")
+    except Exception as e:
+        logger.error(f"存redis时间失败: {e}")
+
     # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理  提供下载页面 (历史数据需要保留)
     output_dir = "/home/node04/descending_cabin_files_uo"
     os.makedirs(output_dir, exist_ok=True)