| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- """
- 询价 + 验价任务:先询价,按航班号/舱位/行李匹配 result 取 data,再验价并按要求处理时效与重试。
- """
- import os
- import json
- import time
- import requests
- import threading
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from xmi_logger import XmiLogger
- import csv
- # 询价\验价
- BASE_URL = "http://8.218.51.130:9000"
- CID = "cba37f642eab11ef8e7200163e01a06e"
- # 汇率
- RATES_URL = "http://8.218.51.130:9003/api/v1/rates"
- RATES_CID = "750B5141EDBF7FA6F73A99C768130099"
- # 政策
- # POLICY_URL = "http://192.168.20.134:8787/prediction/rules/sync"
- POLICY_URL = "http://direct.ysjipiao.com:8787/prediction/rules/sync"
- VALID_DURATION_SECONDS = 3 * 60 # 3 分钟 验价结果有效时间
- RETRY_TIMES = 3 # 请求询价/验价接口重试次数
- RETRY_INTERVAL = 10 # 秒 询价/验价接口重试间隔
- RETRY_DURATION = 3 * 60 # 持续 3 分钟 询价/验价接口重试时间
- PRICE_DIFF_THRESHOLD = 10 # 验价与任务期望价格允许的差值(已统一币种后),在此范围内才视为通过 人民币
- RATES_RETRY_TIMES = 3 # 汇率接口重试次数
- RATES_RETRY_INTERVAL = 2 # 汇率接口重试间隔
- def time_format_conversion(match_time,
- in_strftime_str="%Y-%m-%dT%H:%M:%S",
- out_strftime_str="%Y%m%d%H%M%S"):
- """
- 时间格式转换
- match_time 输入的时间 023-07-29T04:15:00
- in_strftime_str 输入的时间格式
- out_strftime_str 输出的时间格式
- """
- time_array = time.strptime(match_time, in_strftime_str)
- return time.strftime(out_strftime_str, time_array)
- class FlightPriceRequestError(Exception):
- """请求询价/验价接口时的网络或响应异常."""
- def __init__(self, message: str, cause: Exception | None = None):
- self.cause = cause
- super().__init__(message)
- def fetch_rate(base: str, symbols: str, rates_url: str = RATES_URL, cid: str = RATES_CID) -> float | None:
- """
- 请求汇率接口,获取 1 单位 base 兑换为 symbols 的汇率。
- 响应格式: {"code": 0, "msg": "success", "data": {"base": "USD", "symbols": "CNY", "rate": 6.8437}}。
- 带重试,失败返回 None。
- """
- last_err: Exception | None = None
- for attempt in range(RATES_RETRY_TIMES):
- try:
- resp = requests.get(
- rates_url,
- params={"base": base, "symbols": symbols},
- headers={
- "cid": cid,
- "User-Agent": "Apifox/1.0.0 (https://apifox.com)",
- },
- timeout=15
- )
- resp.raise_for_status()
- body = resp.json()
- if body.get("code") != 0:
- last_err = ValueError(body.get("msg", "汇率接口返回非成功"))
- if attempt < RATES_RETRY_TIMES - 1:
- time.sleep(RATES_RETRY_INTERVAL)
- continue
- data = body.get("data")
- if isinstance(data, dict) and "rate" in data:
- return float(data["rate"])
- last_err = ValueError("响应缺少 data.rate")
- except (requests.RequestException, json.JSONDecodeError, TypeError, ValueError, KeyError) as e:
- last_err = e
- if attempt < RATES_RETRY_TIMES - 1:
- time.sleep(RATES_RETRY_INTERVAL)
- return None
- class FlightPriceClient:
- """请求封装:询价、验价接口."""
- def __init__(self, base_url: str = BASE_URL, cid: str = CID):
-
- self.base_url = base_url.rstrip("/")
- self.cid = cid
- self.headers = {
- "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
- "Content-Type": "application/json",
- }
- def _request(self, url: str, payload: dict) -> dict:
- """发起 POST 请求并解析 JSON,统一处理异常。失败时按 RETRY_TIMES 次重试,间隔 RETRY_INTERVAL 秒."""
- last_err = None
- # print(json.dumps(payload, ensure_ascii=False, indent=2))
- for attempt in range(RETRY_TIMES):
- try:
- if attempt > 0:
- time.sleep(RETRY_INTERVAL)
- resp = requests.post(url, headers=self.headers, json=payload, timeout=30)
- resp.raise_for_status()
- body = resp.json()
- print(json.dumps(body, ensure_ascii=False)[:200])
- return body
- # print(resp.json())
- # return resp.json()
- except requests.Timeout as e:
- last_err = FlightPriceRequestError(f"请求超时: {url}", cause=e)
- except requests.ConnectionError as e:
- last_err = FlightPriceRequestError(f"连接失败: {url}", cause=e)
- except requests.HTTPError as e:
- last_err = FlightPriceRequestError(
- f"HTTP 错误 {getattr(e.response, 'status_code', '?')}: {url}", cause=e
- )
- except requests.RequestException as e:
- last_err = FlightPriceRequestError(f"请求异常: {url} - {e}", cause=e)
- except json.JSONDecodeError as e:
- last_err = FlightPriceRequestError(f"响应非合法 JSON: {url}", cause=e)
- raise last_err
- def search_flights(
- self,
- from_city_code: str,
- to_city_code: str,
- from_day: str,
- ) -> dict:
- """询价接口."""
- url = f"{self.base_url}/v1/search_flights"
- payload = {
- "from_city_code": from_city_code,
- "to_city_code": to_city_code,
- "from_day": from_day,
- "cid": self.cid,
- }
- return self._request(url, payload)
- def verify_price(
- self,
- from_city_code: str,
- to_city_code: str,
- from_day: str,
- data: str,
- not_verify: bool = False,
- async_: bool = True,
- ) -> dict:
- """验价接口."""
- url = f"{self.base_url}/v1/verify_price"
- payload = {
- "from_city_code": from_city_code,
- "to_city_code": to_city_code,
- "from_day": from_day,
- "data": data,
- "cid": self.cid,
- "not_verify": not_verify,
- "async": async_,
- }
- return self._request(url, payload)
- class ResultMatcher:
- """从询价 result 中按航班号、舱位、行李匹配一条,并取出 data."""
- @staticmethod
- def match(
- result: list[dict],
- cabins: str,
- baggages: str,
- flight_numbers: str,
- ) -> dict | None:
- """
- 在 result 中匹配:按段顺序一致。cabins / baggages / flight_numbers 均为 ";" 分隔的多段字符串,
- 第 i 段需满足:seg[i].cabin == 第 i 个舱位、seg[i].baggage == 第 i 个行李、seg[i].flight_number == 第 i 个航班号。
- 返回匹配到的那条 result 项(含 data),未匹配到返回 None。
- """
- separator = '|' # 更换分隔符由;为|
- cabin_list = [s.strip() for s in (cabins or "").split(separator)]
- baggage_list = [s.strip() for s in (baggages or "").split(separator)]
- flight_list = [s.strip() for s in (flight_numbers or "").split(separator)] if flight_numbers else []
- n = len(cabin_list)
- if n == 0 or len(baggage_list) != n:
- return None
- if flight_list and len(flight_list) != n:
- return None
- for item in result:
- segments = item.get("segments") or []
- if len(segments) != n:
- continue
- for i in range(n):
- seg = segments[i]
- if seg.get("cabin") != cabin_list[i] or seg.get("baggage") != baggage_list[i]:
- break
- if flight_list and seg.get("flight_number") != flight_list[i]:
- break
- else:
- return item
- return None
- class VerifyResultHandler:
- """验价返回 result 只有一条时:判断 now_time / verify_time 是否在 3 分钟内,并取价格."""
- @staticmethod
- def parse_time(ts: str | int | None) -> datetime | None:
- """解析时间字符串或时间戳."""
- if ts is None:
- return None
- if isinstance(ts, (int, float)):
- return datetime.fromtimestamp(ts)
- for fmt in ("%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"):
- try:
- return datetime.strptime(str(ts).strip(), fmt)
- except (ValueError, TypeError):
- continue
- return None
- @classmethod
- def is_within_3_minutes(cls, ts: str | int | None) -> bool:
- """判断给定时间是否在距今 3 分钟内."""
- dt = cls.parse_time(ts)
- if dt is None:
- return False
- return (datetime.now() - dt).total_seconds() <= VALID_DURATION_SECONDS
- @classmethod
- def get_valid_price(cls, single_result: dict) -> dict | None:
- """
- 单条 result:若存在 now_time 且在 3 分钟内则返回该条(含价格);
- 否则看 verify_time 是否在 3 分钟内。
- 符合条件返回该条 dict,否则返回 None。
- """
- if not single_result:
- return None
- if "now_time" in single_result and cls.is_within_3_minutes(single_result["now_time"]):
- return single_result
- if "verify_time" in single_result and cls.is_within_3_minutes(single_result["verify_time"]):
- return single_result
- return None
- class FlightPriceTaskRunner:
- """单任务流程:询价 -> 匹配 -> 验价 -> 按规则取价或 not_verify 重试."""
- def __init__(self, client: FlightPriceClient | None = None):
- self.logger = XmiLogger("flight_price_task")
- self.client = client or FlightPriceClient()
- self.matcher = ResultMatcher()
- self.handler = VerifyResultHandler()
- self.rate = fetch_rate("USD", "CNY")
-
- def run(
- self,
- task: dict,
- do_verify: bool = True,
- ) -> dict:
- """
- 执行单条任务。task 需含: from_city_code, to_city_code, from_day, cabin, baggage, adult_total_price。
- flight_number 可选,用于匹配。
- do_verify=False 时仅执行询价+匹配并返回 matched(不做验价)
- 返回: {"status": "ok"|"placeholder"|"no_match", "price_info": {...}, "raw_verify": ..., "raw_search": ..., "matched": ...}
- """
- from_city_code = task["from_city_code"]
- to_city_code = task["to_city_code"]
- from_day = task["from_day"]
- flight_numbers = task.get("flight_numbers")
- cabins = task["cabins"]
- baggages = task["baggages"]
- # 1. 询价
- try:
- search_resp = self.client.search_flights(from_city_code, to_city_code, from_day)
- except FlightPriceRequestError as e:
- self.logger.warning(f"询价请求异常: {e}")
- return {"status": "request_error", "msg": str(e), "phase": "search"}
- if search_resp.get("code") != 0:
- return {"status": "search_failed", "msg": search_resp.get("msg"), "raw_search": search_resp}
- result_list = search_resp.get("result") or []
- # print(result_list)
- matched = self.matcher.match(result_list, cabins=cabins, baggages=baggages, flight_numbers=flight_numbers)
- # print(matched)
- if not matched:
- return {"status": "no_match", "msg": "未匹配到航班/舱位/行李", "raw_search": search_resp}
- data = matched.get("data")
- if not data:
- return {"status": "no_data", "msg": "匹配项无 data", "raw_search": search_resp}
- # 只询价不验价走的流程
- if not do_verify:
- # return {"status": "ok", "raw_search": search_resp, "matched": matched, "data": data}
- expected_in_currency, rate_err = self._expected_price_in_verify_currency(task, matched)
- if rate_err:
- return {
- "status": "rate_error",
- "msg": rate_err,
- "raw_search": search_resp,
- "matched": matched,
- "data": data,
- }
- actual = matched.get("adult_total_price") # 询价和验价接口出来的币种已经是人民币, 不用再转换
- if self._price_within_threshold(expected_in_currency, actual): # 对比
- return {
- "status": "ok",
- "price_info": self._extract_price_info(matched),
- "raw_search": search_resp,
- "matched": matched,
- "data": data,
- }
- return {
- "status": "price_not_within_threshold",
- "msg": "询价结果价格不在阈值内",
- "expected": expected_in_currency,
- "actual": actual,
- "raw_search": search_resp,
- "matched": matched,
- "data": data,
- }
-
- # 2. 验价(先 not_verify=False)
- try:
- verify_resp = self.client.verify_price(
- from_city_code=from_city_code,
- to_city_code=to_city_code,
- from_day=from_day,
- data=data,
- not_verify=False,
- async_=True,
- )
- except FlightPriceRequestError as e:
- self.logger.warning(f"验价请求异常: {e}")
- return {"status": "request_error", "msg": str(e), "phase": "verify"}
- if verify_resp.get("code") != 0:
- return {"status": "verify_failed", "msg": verify_resp.get("msg"), "raw_verify": verify_resp}
- # print(verify_resp)
- verify_result = verify_resp.get("result")
- if isinstance(verify_result, list) and len(verify_result) >= 1:
- single = verify_result[0]
- valid = self.handler.get_valid_price(single)
- if valid is not None:
- expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
- if rate_err:
- return {"status": "rate_error", "msg": rate_err}
- if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
- return {
- "status": "ok",
- "price_info": self._extract_price_info(valid),
- "raw_verify": verify_resp,
- }
- # 3. 不符合时效:用 not_verify=True 每 10 秒重试,最多 3 分钟
- deadline = time.monotonic() + RETRY_DURATION
- while time.monotonic() < deadline:
- time.sleep(RETRY_INTERVAL)
- try:
- verify_resp = self.client.verify_price(
- from_city_code=from_city_code,
- to_city_code=to_city_code,
- from_day=from_day,
- data=data,
- not_verify=True,
- async_=True,
- )
- except FlightPriceRequestError as e:
- self.logger.warning(f"验价重试请求异常: {e}")
- continue
- if verify_resp.get("code") != 0:
- continue
- verify_result = verify_resp.get("result")
- if isinstance(verify_result, list) and len(verify_result) >= 1:
- single = verify_result[0]
- valid = self.handler.get_valid_price(single)
- if valid is not None:
- expected_in_verify_currency, rate_err = self._expected_price_in_verify_currency(task, valid)
- if rate_err:
- continue
- if self._price_within_threshold(expected_in_verify_currency, valid.get("adult_total_price")):
- return {
- "status": "ok",
- "price_info": self._extract_price_info(valid),
- "raw_verify": verify_resp,
- }
- else:
- # 价格不符合,直接跳出循环,不用继续校验
- break
- # 3 分钟内都没有符合规则的数据,先占位
- return {
- "status": "placeholder",
- "msg": f"{RETRY_DURATION / 60} 分钟内未得到符合规则的价格,等待后续逻辑处理",
- "task": task,
- "data": data,
- "raw_verify": verify_resp,
- }
- def _expected_price_in_verify_currency(
- self, task: dict, valid: dict
- ) -> tuple[float | None, str | None]:
- """
- 将任务期望价格换算为验价结果币种后的数值。
- 返回 (换算后的价格, None),若币种一致则直接换算为 float;
- 若需汇率且获取失败返回 (None, "汇率获取失败")。
- """
- try:
- expected_val = float(task.get("adult_total_price"))
- except (TypeError, ValueError):
- return None, "任务 adult_total_price 无效"
- if self.rate is None:
- task_currency = (task.get("currency") or "USD").strip().upper()
- verify_currency = (valid.get("currency") or "CNY").strip().upper()
- if task_currency == verify_currency:
- return expected_val, None
- rate = fetch_rate(task_currency, verify_currency)
- if rate is None:
- return None, "汇率获取失败"
- self.rate = rate
- return expected_val * self.rate, None
- @staticmethod
- def _price_within_threshold(
- expected: str | int | float | None,
- actual: str | int | float | None,
- threshold: float | None = None,
- ) -> bool:
- """校验验价结果价格与任务期望价格差是否在阈值内(两者已统一币种)。阈值默认 PRICE_DIFF_THRESHOLD。"""
- if threshold is None:
- threshold = PRICE_DIFF_THRESHOLD
- try:
- e = float(expected) if expected is not None else None
- a = float(actual) if actual is not None else None
- except (TypeError, ValueError):
- return False
- if e is None or a is None:
- return False
- return abs(a - e) <= threshold
- @staticmethod
- def _extract_price_info(item: dict) -> dict:
- """从单条 result 项提取价格相关信息."""
- return {
- "adult_price": item.get("adult_price"),
- "adult_tax": item.get("adult_tax"),
- "adult_total_price": item.get("adult_total_price"),
- "currency": item.get("currency"),
- "now_time": item.get("now_time"),
- "verify_time": item.get("verify_time"),
- }
- def _process_one_task(row, runner):
- """处理单条任务:构建 end_task、执行 run、解析结果。成功返回 flight_data 字典,失败返回 None。"""
- task = row
- separator = '|' # 分隔符由;更换为|
- thread_name = threading.current_thread().name
- # print(f"[thread_name: {thread_name}] 正在处理任务: {task}")
- from_city_code, to_city_code = task["city_pair"].split("-")
- from_day = task["flight_day"].replace("-", "")
- flight_numbers = task["flight_number_1"].strip()
- if task["flight_number_2"].strip() != "VJ":
- flight_numbers += separator + task["flight_number_2"].strip()
- cabins = separator.join(["Y"] * len(flight_numbers.split(separator)))
- if str(task['baggage']) == '0':
- baggage_str = "-;-;-;-"
- else:
- baggage_str = f"1-{task['baggage']}"
- baggages = separator.join([baggage_str] * len(flight_numbers.split(separator)))
- end_task = {
- "from_city_code": from_city_code,
- "to_city_code": to_city_code,
- "from_day": from_day,
- "flight_numbers": flight_numbers,
- "cabins": cabins,
- "baggages": baggages,
- "adult_total_price": task.get("adult_total_price"),
- "currency": task.get("currency", "USD"),
- }
- # print("--------------------------------")
- # print(end_task)
- # print("--------------------------------")
- time.sleep(1)
- out = runner.run(end_task, do_verify=False) # 不验价,仅询价
- # print(json.dumps(out, ensure_ascii=False, indent=2))
- if out.get("status") != "ok":
- # print(f"[thread_name={thread_name}] 错误: {out.get('msg')}")
- return None
- # print(f"价格: {out.get('price_info').get('adult_total_price')}")
- raw_verify = out.get("raw_verify")
- if raw_verify:
- results = raw_verify.get("result") or []
- else:
- matched = out.get("matched") or {}
- results = [matched] if matched else []
- if not results:
- return None
- print("raw_verify pass")
- # task 存放了 keep_info 的全部字段
- drop_price_change_upper = float(task.get("drop_price_change_upper")) # 降价的最小幅度
- drop_price_change_lower = float(task.get("drop_price_change_lower"))
- max_threshold = round(drop_price_change_upper * runner.rate * 0.5) # 降价阈值要按汇率转人民币(四舍五入到整数)
- result = results[0]
- # adult_price = result.get("adult_price")
- # adult_tax = result.get("adult_tax")
- # adult_total_price = result.get("adult_total_price")
- segments = result.get("segments") or []
- if not segments:
- return None
- end_segments = []
- baggage = segments[0].get("baggage")
- if baggage == "-;-;-;-":
- pc, kg = 0, 0 # 无行李的设置?
- else:
- pc, kg = [int(i) for i in baggage.split("-")]
- for seg in segments:
- flight_number = seg.get("flight_number")
- operating_flight_number = seg.get("operating_flight_number")
- if flight_number == operating_flight_number:
- operating_flight_number = ""
- dep_time = seg.get("dep_time")
- arr_time = seg.get("arr_time")
- dep_time = time_format_conversion(dep_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
- arr_time = time_format_conversion(arr_time, in_strftime_str="%Y%m%d%H%M%S", out_strftime_str="%Y-%m-%d %H:%M:%S")
- end_segment = {
- "carrier": seg.get("carrier"),
- "flight_number": flight_number,
- # "dep_air_port": seg.get("dep_air_port"),
- # "arr_air_port": seg.get("arr_air_port"),
- "dep_city_code": seg.get("dep_city_code"),
- "arr_city_code": seg.get("arr_city_code"),
- # "operating_flight_number": operating_flight_number,
- "cabin": seg.get("cabin"),
- "dep_time": dep_time,
- # "arr_time": arr_time,
- }
- end_segments.append(end_segment)
- return {
- "trip_type": 1,
- # "cover_price": adult_price,
- # "cover_tax": adult_tax,
- "bag_amount": pc,
- "bag_weight": kg,
- "max_threshold": max_threshold,
- "segments": end_segments,
- "ret_segments": [],
- "task": task
- }
- def sync_policy(payload):
- headers = {
- "Content-Type": "application/json",
- }
- # print(json.dumps(payload, ensure_ascii=False, indent=2))
- response = requests.post(POLICY_URL, headers=headers, json=payload, timeout=30)
- resp_json = response.json()
- """
- {
- "code": 0,
- "msg": "ok",
- "data": {
- "deleted": 1,
- "created": 7
- }
- }
- """
- # print(json.dumps(resp_json, ensure_ascii=False, indent=2))
- return resp_json
- def main():
- logger = XmiLogger("task")
- # 注意 \ufeff 是 UTF-8 的 BOM
- # 所以需要使用 utf-8-sig 编码
- task_list = []
- runner = FlightPriceTaskRunner()
- # 1 读取任务列表
- output_dir = "./keep_0"
- keep_info_path = os.path.join(output_dir, "keep_info.csv")
- with open(keep_info_path, "r", encoding="utf-8-sig") as f:
- reader = csv.DictReader(f)
- for row in reader:
- task_list.append(row)
- # 2 任务列表逻辑处理(多线程)
- policy_list = []
- keep_info_end = []
- max_workers = 5 # 并发线程数,可按需要调整
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {executor.submit(_process_one_task, task, runner): task for task in task_list}
- total = len(futures)
- done = 0
- failed = 0
- for future in as_completed(futures):
- try:
- flight_data = future.result()
- if flight_data is not None:
- task = flight_data.pop("task")
- keep_info_end.append(task)
- policy_list.append(flight_data)
- except Exception as e:
- failed += 1
- task = futures[future]
- # print(f"任务异常 {task}: {e}")
- logger.error(f"任务异常 {task}: {e}")
- finally:
- done += 1
- logger.info(
- f"进度: {done}/{total}, policy: {len(policy_list)}, keep: {len(keep_info_end)}, failed: {failed}"
- )
- # 3 批量一次性上传政策
- logger.info(f"数据过滤后, 上传政策: {len(policy_list)}")
- # logger.info(f"policy_list: {policy_list}")
- logger.info(f"policy_list: {json.dumps(policy_list, ensure_ascii=False, default=str)[:1000]}")
- if len(policy_list) > 0:
- # 这里批量一次性上传政策
- payload = {"items": policy_list}
- sync_policy(payload)
- logger.info(f"keep_info_end: {len(keep_info_end)}")
- # 将 keep_info_end 写入到文件csv 文件 嵌套结构要处理 提供下载页面 (历史数据需要保留)
- # if not os.path.exists("/home/node04/descending_cabin_files"):
- # os.makedirs("/home/node04/descending_cabin_files")
- with open(f"/home/node04/descending_cabin_files/keep_info_end_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", "w", encoding="utf-8-sig") as f:
- writer = csv.DictWriter(f, fieldnames=keep_info_end[0].keys())
- writer.writeheader()
- for task in keep_info_end:
- writer.writerow(task)
- logger.info(f"keep_info_end 写入完成")
- if __name__ == "__main__":
- main()
|