""" 无询价 + 无验价任务:直接按航班号/舱位/行李匹配 result 取 data """ import os import json import time import requests import threading import traceback from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from xmi_logger import XmiLogger import csv POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync" def time_format_conversion(match_time, in_strftime_str="%Y-%m-%dT%H:%M:%S", out_strftime_str="%Y%m%d%H%M%S"): """ 时间格式转换 match_time 输入的时间 023-07-29T04:15:00 in_strftime_str 输入的时间格式 out_strftime_str 输出的时间格式 """ time_array = time.strptime(match_time, in_strftime_str) return time.strftime(out_strftime_str, time_array) def _process_one_task(row): task = row city_pair = (task.get("citypair") or "").strip() if "-" not in city_pair: return None from_city_code, to_city_code = city_pair.split("-", 1) flight_numbers_raw = ( task.get("flight_numbers") or "" ).strip() flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()] if not flight_numbers: return None cabins_raw = (task.get("cabins") or "").strip() cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else [] if not cabin_list: cabin_list = ["Y"] * len(flight_numbers) if len(cabin_list) < len(flight_numbers): cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list))) baggage_val = str(task.get("baggage_weight") or "0").strip() try: kg = int(float(baggage_val)) except (TypeError, ValueError): kg = 0 pc = 0 if kg <= 0 else 1 dep_time = ( task.get("from_time") or f"{task.get('from_date') or ''} 00:00:00" ) drop_price_change_upper = float(task.get("drop_price_change_upper") or 0) # 最小的降价幅度 max_threshold = round(drop_price_change_upper * 0.8) end_segments = [] for idx, flight_number in enumerate(flight_numbers): carrier = "".join([c for c in flight_number if c.isalpha()]) end_segments.append({ "carrier": carrier, "flight_number": flight_number, "dep_city_code": from_city_code, "arr_city_code": to_city_code, "cabin": cabin_list[idx], "dep_time": dep_time, }) return { "trip_type": 1, "bag_amount": pc, "bag_weight": kg, "max_threshold": max_threshold, "segments": end_segments, "ret_segments": [], "task": task, } def sync_policy(payload): headers = { "Content-Type": "application/json", } # print(json.dumps(payload, ensure_ascii=False, indent=2)) response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30) resp_json = response.json() """ { "code": 0, "msg": "ok", "data": { "deleted": 1, "created": 7 } } """ # print(json.dumps(resp_json, ensure_ascii=False, indent=2)) return resp_json def main(): logger = XmiLogger("task") # 注意 \ufeff 是 UTF-8 的 BOM # 所以需要使用 utf-8-sig 编码 task_list = [] # 1 读取任务列表 output_dir = "./keep" keep_info_path = os.path.join(output_dir, "keep_info.csv") with open(keep_info_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: task_list.append(row) # 2 任务列表逻辑处理(多线程) policy_list = [] keep_info_end = [] max_workers = 1 # 并发线程数,可按需要调整 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(_process_one_task, task): task for task in task_list} total = len(futures) done = 0 failed = 0 for future in as_completed(futures): try: flight_data = future.result() if flight_data is not None: task = flight_data.pop("task") keep_info_end.append(task) policy_list.append(flight_data) except Exception as e: failed += 1 task = futures[future] # print(f"任务异常 {task}: {e}") logger.error(f"任务异常 {task}: {e}") finally: done += 1 logger.info( f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}" ) # 3 批量一次性上传政策 logger.info(f"数据过滤后, 上传政策: {len(policy_list)}") # logger.info(f"policy_list: {policy_list}") logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}") if len(policy_list) > 0: # 这里批量一次性上传政策 payload = {"items": policy_list} try: sync_policy(payload) logger.info(f"上传政策成功") except Exception as e: logger.error(f"上传政策失败: {e}") logger.error(f"{traceback.format_exc()}") logger.info(f"keep_info_end: {len(keep_info_end)}") # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留) output_dir = "/home/node04/descending_cabin_files_uo" os.makedirs(output_dir, exist_ok=True) if keep_info_end: out_path = os.path.join( output_dir, f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", ) with open(out_path, "w", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys()) writer.writeheader() for task in keep_info_end: writer.writerow(task) logger.info("keep_info_end 写入完成") else: logger.warning("keep_info_end 为空,跳过写入CSV") if __name__ == "__main__": main()