| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- """
- 无询价 + 无验价任务:直接按航班号/舱位/行李匹配 result 取 data
- """
- import os
- import json
- import time
- import requests
- import threading
- import traceback
- import redis
- from datetime import datetime, timedelta
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from xmi_logger import XmiLogger
- import csv
- POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
- def time_format_conversion(match_time,
- in_strftime_str="%Y-%m-%dT%H:%M:%S",
- out_strftime_str="%Y%m%d%H%M%S"):
- """
- 时间格式转换
- match_time 输入的时间 023-07-29T04:15:00
- in_strftime_str 输入的时间格式
- out_strftime_str 输出的时间格式
- """
- time_array = time.strptime(match_time, in_strftime_str)
- return time.strftime(out_strftime_str, time_array)
- def _process_one_task(row):
- task = row
-
- price_base = round(float(task.get('price_base', '0.0')))
- price_tax = round(float(task.get('price_tax', '0.0')))
- city_pair = (task.get("citypair") or "").strip()
- if "-" not in city_pair:
- return None
- from_city_code, to_city_code = city_pair.split("-", 1)
-
- flight_numbers_raw = (
- task.get("flight_numbers")
- or ""
- ).strip()
-
- flight_numbers = [i.strip() for i in flight_numbers_raw.split(",") if i.strip()]
- if not flight_numbers:
- return None
- cabins_raw = (task.get("cabins") or "").strip()
- cabin_list = [i.strip() for i in cabins_raw.split(",") if i.strip()] if cabins_raw else []
- if not cabin_list:
- cabin_list = ["Y"] * len(flight_numbers)
- if len(cabin_list) < len(flight_numbers):
- cabin_list.extend([cabin_list[-1]] * (len(flight_numbers) - len(cabin_list)))
-
- baggage_val = str(task.get("baggage_weight") or "0").strip()
- try:
- kg = int(float(baggage_val))
- except (TypeError, ValueError):
- kg = 0
- pc = 0 if kg <= 0 else 1
- dep_time = (
- task.get("from_time")
- or f"{task.get('from_date') or ''} 00:00:00"
- )
- drop_price_change_upper = float(task.get("drop_price_change_upper") or 0) # 最小的降价幅度
- max_threshold = round(drop_price_change_upper * 1.0)
- max_threshold = max_threshold + 20
- if max_threshold > 0 or abs(max_threshold) < 10: # 丢弃小于10人民币的降价幅度
- return None
-
- drop_price_sample_size = int(task.get("drop_price_sample_size", "0"))
- if drop_price_sample_size < 1: # 丢弃历史降价样本数过少(小于1)的
- return None
-
- from_date = task.get("from_date")
- if from_date in ['2026-04-28', '2026-04-29', '2026-04-30', '2026-05-01', '2026-05-05']: # 丢弃特殊起飞日期的
- return None
- end_segments = []
- for idx, flight_number in enumerate(flight_numbers):
- carrier = "".join([c for c in flight_number if c.isalpha()])
- end_segments.append({
- "carrier": carrier,
- "flight_number": flight_number,
- "dep_city_code": from_city_code,
- "arr_city_code": to_city_code,
- "cabin": cabin_list[idx],
- "dep_time": dep_time,
- })
- return {
- "trip_type": 1,
- "cover_price": price_base,
- "cover_tax": price_tax,
- "bag_amount": pc,
- "bag_weight": kg,
- "max_threshold": max_threshold,
- "segments": end_segments,
- "ret_segments": [],
- "task": task,
- }
- def sync_policy(payload):
- headers = {
- "Content-Type": "application/json",
- }
- # print(json.dumps(payload, ensure_ascii=False, indent=2))
- response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
- resp_json = response.json()
- """
- {
- "code": 0,
- "msg": "ok",
- "data": {
- "deleted": 1,
- "created": 7
- }
- }
- """
- # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
- return resp_json
- def time_handle():
- now_time = datetime.now()
- next_time = now_time + timedelta(hours=2)
- next_ts = int(next_time.timestamp())
- expire_at = next_time + timedelta(minutes=1)
- ttl_seconds = int((expire_at - now_time).total_seconds())
- if ttl_seconds <= 0:
- ttl_seconds = 1
- redis_client = redis.Redis(host='192.168.20.98', port=6379, db=0)
- lock_key = "uo_next_pred_time"
- redis_client.set(lock_key, next_ts, ex=ttl_seconds)
- def main():
- logger = XmiLogger("task")
- # 注意 \ufeff 是 UTF-8 的 BOM
- # 所以需要使用 utf-8-sig 编码
- task_list = []
- # 1 读取任务列表
- output_dir = "./keep"
- keep_info_path = os.path.join(output_dir, "keep_info.csv")
- with open(keep_info_path, "r", encoding="utf-8-sig") as f:
- reader = csv.DictReader(f)
- for row in reader:
- task_list.append(row)
- # 2 任务列表逻辑处理(多线程)
- policy_list = []
- keep_info_end = []
- max_workers = 1 # 并发线程数,可按需要调整
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {executor.submit(_process_one_task, task): task for task in task_list}
- total = len(futures)
- done = 0
- failed = 0
- for future in as_completed(futures):
- try:
- flight_data = future.result()
- if flight_data is not None:
- task = flight_data.pop("task")
- keep_info_end.append(task)
- policy_list.append(flight_data)
- except Exception as e:
- failed += 1
- task = futures[future]
- # print(f"任务异常 {task}: {e}")
- logger.error(f"任务异常 {task}: {e}")
- finally:
- done += 1
- logger.info(
- f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
- )
- # 3 批量一次性上传政策
- logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
- # logger.info(f"policy_list: {policy_list}")
- logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
- if len(policy_list) > 0:
- # 这里批量一次性上传政策
- payload = {"items": policy_list}
- max_attempts = 3
- retryable_exceptions = (
- requests.exceptions.ConnectionError,
- requests.exceptions.ConnectTimeout,
- )
- for attempt in range(1, max_attempts + 1):
- try:
- sync_policy(payload)
- logger.info(f"上传政策成功")
- break
- except retryable_exceptions as e:
- if attempt == max_attempts:
- logger.error(f"上传政策失败(已重试{max_attempts}次): {e}")
- # logger.error(f"{traceback.format_exc()}")
- else:
- wait_seconds = attempt
- logger.warning(
- f"上传政策连接异常,第{attempt}/{max_attempts}次失败: {e},{wait_seconds}s后重试"
- )
- time.sleep(wait_seconds)
- except Exception as e:
- logger.error(f"上传政策失败(非连接异常,不重试): {e}")
- logger.error(f"{traceback.format_exc()}")
- break
- logger.info(f"keep_info_end: {len(keep_info_end)}")
-
- try:
- time_handle()
- logger.info(f"存redis时间成功")
- except Exception as e:
- logger.error(f"存redis时间失败: {e}")
- # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
- output_dir = "/home/node04/descending_cabin_files_uo"
- os.makedirs(output_dir, exist_ok=True)
- if keep_info_end:
- out_path = os.path.join(
- output_dir,
- f"keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
- )
- with open(out_path, "w", encoding="utf-8-sig") as f:
- writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
- writer.writeheader()
- for task in keep_info_end:
- writer.writerow(task)
- logger.info("keep_info_end 写入完成")
- else:
- logger.warning("keep_info_end 为空,跳过写入CSV")
- if __name__ == "__main__":
- main()
|