| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- """
- 无询价 + 无验价任务:直接按航班号/舱位/行李匹配 result 取 data
- """
- import os
- import json
- import time
- import requests
- import threading
- import traceback
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from xmi_logger import XmiLogger
- import csv
- POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
- def time_format_conversion(match_time,
- in_strftime_str="%Y-%m-%dT%H:%M:%S",
- out_strftime_str="%Y%m%d%H%M%S"):
- """
- 时间格式转换
- match_time 输入的时间 023-07-29T04:15:00
- in_strftime_str 输入的时间格式
- out_strftime_str 输出的时间格式
- """
- time_array = time.strptime(match_time, in_strftime_str)
- return time.strftime(out_strftime_str, time_array)
- def _process_one_task(row):
- task = row
-
- city_pair = (task.get("citypair") or "").strip()
- if "-" not in city_pair:
- return None
- from_city_code, to_city_code = city_pair.split("-", 1)
-
- flight_numbers_raw = (
- task.get("flight_numbers")
- or ""
- ).strip()
-
- flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()]
- if not flight_numbers:
- return None
- cabins_raw = (task.get("cabins") or "").strip()
- cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else []
- if not cabin_list:
- cabin_list = ["Y"] * len(flight_numbers)
- if len(cabin_list) < len(flight_numbers):
- cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list)))
-
- baggage_val = str(task.get("baggage_weight") or "0").strip()
- try:
- kg = int(float(baggage_val))
- except (TypeError, ValueError):
- kg = 0
- pc = 0 if kg <= 0 else 1
- dep_time = (
- task.get("from_time")
- or f"{task.get('from_date') or ''} 00:00:00"
- )
- drop_price_change_upper = float(task.get("drop_price_change_upper") or 0) # 最小的降价幅度
- max_threshold = round(drop_price_change_upper * 0.8)
- end_segments = []
- for idx, flight_number in enumerate(flight_numbers):
- carrier = "".join([c for c in flight_number if c.isalpha()])
- end_segments.append({
- "carrier": carrier,
- "flight_number": flight_number,
- "dep_city_code": from_city_code,
- "arr_city_code": to_city_code,
- "cabin": cabin_list[idx],
- "dep_time": dep_time,
- })
- return {
- "trip_type": 1,
- "bag_amount": pc,
- "bag_weight": kg,
- "max_threshold": max_threshold,
- "segments": end_segments,
- "ret_segments": [],
- "task": task,
- }
- def sync_policy(payload):
- headers = {
- "Content-Type": "application/json",
- }
- # print(json.dumps(payload, ensure_ascii=False, indent=2))
- response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
- resp_json = response.json()
- """
- {
- "code": 0,
- "msg": "ok",
- "data": {
- "deleted": 1,
- "created": 7
- }
- }
- """
- # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
- return resp_json
- def main():
- logger = XmiLogger("task")
- # 注意 \ufeff 是 UTF-8 的 BOM
- # 所以需要使用 utf-8-sig 编码
- task_list = []
- # 1 读取任务列表
- output_dir = "./keep"
- keep_info_path = os.path.join(output_dir, "keep_info.csv")
- with open(keep_info_path, "r", encoding="utf-8-sig") as f:
- reader = csv.DictReader(f)
- for row in reader:
- task_list.append(row)
- # 2 任务列表逻辑处理(多线程)
- policy_list = []
- keep_info_end = []
- max_workers = 1 # 并发线程数,可按需要调整
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {executor.submit(_process_one_task, task): task for task in task_list}
- total = len(futures)
- done = 0
- failed = 0
- for future in as_completed(futures):
- try:
- flight_data = future.result()
- if flight_data is not None:
- task = flight_data.pop("task")
- keep_info_end.append(task)
- policy_list.append(flight_data)
- except Exception as e:
- failed += 1
- task = futures[future]
- # print(f"任务异常 {task}: {e}")
- logger.error(f"任务异常 {task}: {e}")
- finally:
- done += 1
- logger.info(
- f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
- )
- # 3 批量一次性上传政策
- logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
- # logger.info(f"policy_list: {policy_list}")
- logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
- if len(policy_list) > 0:
- # 这里批量一次性上传政策
- payload = {"items": policy_list}
- try:
- sync_policy(payload)
- logger.info(f"上传政策成功")
- except Exception as e:
- logger.error(f"上传政策失败: {e}")
- logger.error(f"{traceback.format_exc()}")
- logger.info(f"keep_info_end: {len(keep_info_end)}")
- # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
- output_dir = "/home/node04/descending_cabin_files_uo"
- os.makedirs(output_dir, exist_ok=True)
- if keep_info_end:
- out_path = os.path.join(
- output_dir,
- f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
- )
- with open(out_path, "w", encoding="utf-8-sig") as f:
- writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
- writer.writeheader()
- for task in keep_info_end:
- writer.writerow(task)
- logger.info("keep_info_end 写入完成")
- else:
- logger.warning("keep_info_end 为空,跳过写入CSV")
- if __name__ == "__main__":
- main()
|