فهرست منبع

提交往规则投放部分

node04 1 ماه پیش
والد
کامیت
17060d3da1
3فایلهای تغییر یافته به همراه195 افزوده شده و 3 حذف شده
  1. 191 0
      descending_cabin_task.py
  2. 2 1
      follow_up.py
  3. 2 2
      uo_atlas_import.py

+ 191 - 0
descending_cabin_task.py

@@ -0,0 +1,191 @@
+"""
+询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
+"""
+import os
+import json
+import time
+import requests
+import threading
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from xmi_logger import XmiLogger
+import csv
+
+
+POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
+
+
+def time_format_conversion(match_time,
+                           in_strftime_str="%Y-%m-%dT%H:%M:%S",
+                           out_strftime_str="%Y%m%d%H%M%S"):
+    """
+    时间格式转换
+    match_time 输入的时间 023-07-29T04:15:00
+    in_strftime_str 输入的时间格式
+    out_strftime_str 输出的时间格式
+    """
+    time_array = time.strptime(match_time, in_strftime_str)
+    return time.strftime(out_strftime_str, time_array)
+
+
+
+def _process_one_task(row):
+    task = row
+    
+    city_pair = (task.get("citypair") or "").strip()
+    if "-" not in city_pair:
+        return None
+    from_city_code, to_city_code = city_pair.split("-", 1)
+    
+    flight_numbers_raw = (
+        task.get("flight_numbers")
+        or ""
+    ).strip()
+        
+    flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()]
+    if not flight_numbers:    
+        return None
+
+    cabins_raw = (task.get("cabins") or "").strip()
+    cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else []
+    if not cabin_list:
+        cabin_list = ["Y"] * len(flight_numbers)
+    if len(cabin_list) < len(flight_numbers):
+        cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list)))
+    
+    baggage_val = str(task.get("baggage_weight") or "0").strip()
+    try:
+        kg = int(float(baggage_val))
+    except (TypeError, ValueError):
+        kg = 0
+    pc = 0 if kg <= 0 else 1
+
+    dep_time = (
+        task.get("from_time")
+        or f"{task.get('from_date') or ''} 00:00:00"
+    )
+
+    drop_price_change_upper = float(task.get("drop_price_change_upper") or 0)   # 最小的降价幅度
+    max_threshold = round(drop_price_change_upper * 0.5)
+
+    end_segments = []
+    for idx, flight_number in enumerate(flight_numbers):
+        carrier = "".join([c for c in flight_number if c.isalpha()])
+        end_segments.append({
+            "carrier": carrier,
+            "flight_number": flight_number,
+            "dep_city_code": from_city_code,
+            "arr_city_code": to_city_code,
+            "cabin": cabin_list[idx],
+            "dep_time": dep_time,
+        })
+
+    return {
+        "trip_type": 1,
+        "bag_amount": pc,
+        "bag_weight": kg,
+        "max_threshold": max_threshold,
+        "segments": end_segments,
+        "ret_segments": [],
+        "task": task,
+    }
+
+
+
+def sync_policy(payload):
+    headers = {
+        "Content-Type": "application/json",
+    }
+    # print(json.dumps(payload, ensure_ascii=False, indent=2))
+    response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
+    resp_json = response.json()
+    """
+    {
+        "code": 0,
+        "msg": "ok",
+        "data": {
+            "deleted": 1,
+            "created": 7
+        }
+    }
+    """
+    # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
+    return resp_json
+
+
+
+def main():
+
+    logger = XmiLogger("task")
+
+    # 注意  \ufeff 是 UTF-8 的 BOM
+    # 所以需要使用 utf-8-sig 编码
+    task_list = []
+
+    # 1 读取任务列表
+    output_dir = "./keep"
+    keep_info_path = os.path.join(output_dir, "keep_info.csv")
+
+    with open(keep_info_path, "r", encoding="utf-8-sig") as f:
+        reader = csv.DictReader(f)
+        for row in reader:
+            task_list.append(row)
+
+    # 2 任务列表逻辑处理(多线程)
+
+    policy_list = []
+    keep_info_end = []
+    max_workers = 1  # 并发线程数,可按需要调整
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        futures = {executor.submit(_process_one_task, task): task for task in task_list}
+        total = len(futures)
+        done = 0
+        failed = 0
+        for future in as_completed(futures):
+            try:
+                flight_data = future.result()
+                if flight_data is not None:
+                    task = flight_data.pop("task")
+                    keep_info_end.append(task)
+                    policy_list.append(flight_data)
+            except Exception as e:
+                failed += 1
+                task = futures[future]
+                # print(f"任务异常 {task}: {e}")
+                logger.error(f"任务异常 {task}: {e}")
+            finally:
+                done += 1
+                logger.info(
+                    f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
+                )
+
+    # 3 批量一次性上传政策
+    logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
+    # logger.info(f"policy_list: {policy_list}")
+    logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
+    if len(policy_list) > 0:
+        # 这里批量一次性上传政策 
+        payload = {"items": policy_list}
+        sync_policy(payload)
+
+    logger.info(f"keep_info_end: {len(keep_info_end)}")
+    # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理  提供下载页面 (历史数据需要保留)
+    output_dir = "/home/node04/uo_descending_cabin_files"
+    os.makedirs(output_dir, exist_ok=True)
+    if keep_info_end:
+        out_path = os.path.join(
+            output_dir,
+            f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
+        )
+        with open(out_path, "w", encoding="utf-8-sig") as f:
+            writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
+            writer.writeheader()
+            for task in keep_info_end:
+                writer.writerow(task)
+        logger.info("keep_info_end 写入完成")
+    else:
+        logger.warning("keep_info_end 为空,跳过写入CSV")
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 1
follow_up.py

@@ -253,4 +253,5 @@ if __name__ == "__main__":
     time.sleep(5)
     follow_up_handle()
     time.sleep(5)
-    
+    from descending_cabin_task import main as descending_cabin_task_main
+    descending_cabin_task_main()

+ 2 - 2
uo_atlas_import.py

@@ -241,8 +241,8 @@ if __name__ == "__main__":
     create_at_end = current_time.strftime("%Y-%m-%d %H:%M:%S")
     create_at_begin = (current_time - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
 
-    # create_at_begin = "2026-03-21 00:00:00"
-    # create_at_end = "2026-03-29 23:59:59"
+    # create_at_begin = "2026-03-30 00:00:00"
+    # create_at_end = "2026-03-31 08:59:59"
 
     main_import_process(create_at_begin, create_at_end)