result_keep_verify.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. import os
  2. import datetime
  3. import pandas as pd
  4. from data_loader import mongo_con_parse, validate_keep_one_line
  5. from config import mongo_config, mongo_table_uo
  6. def _validate_keep_info_df(df_keep_info_part):
  7. client, db = mongo_con_parse(mongo_config)
  8. count = 0
  9. if "price_diff" not in df_keep_info_part.columns:
  10. df_keep_info_part["price_diff"] = 0
  11. if "time_diff_hours" not in df_keep_info_part.columns:
  12. df_keep_info_part["time_diff_hours"] = 0
  13. for idx, row in df_keep_info_part.iterrows():
  14. df_keep_info_part.at[idx, "price_diff"] = 0
  15. df_keep_info_part.at[idx, "time_diff_hours"] = 0
  16. city_pair = row['citypair']
  17. flight_numbers = row['flight_numbers']
  18. baggage_weight = row['baggage_weight']
  19. from_date = row['from_date']
  20. into_update_hour = row['into_update_hour']
  21. into_update_dt = pd.to_datetime(into_update_hour, format='%Y-%m-%d %H:%M:%S')
  22. del_batch_time_str = row['del_batch_time_str']
  23. del_batch_dt = pd.to_datetime(del_batch_time_str, format='%Y%m%d%H%M')
  24. del_batch_std_str = del_batch_dt.strftime('%Y-%m-%d %H:%M:%S')
  25. entry_price = pd.to_numeric(row.get('price_total'), errors='coerce')
  26. df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, del_batch_std_str)
  27. if (not df_query.empty) and pd.notna(entry_price):
  28. if ("price_total" in df_query.columns) and ("create_time" in df_query.columns):
  29. df_query["price_total"] = pd.to_numeric(df_query["price_total"], errors="coerce")
  30. df_query["create_dt"] = pd.to_datetime(df_query["create_time"], errors="coerce")
  31. df_query = (
  32. df_query.dropna(subset=["price_total", "create_dt"])
  33. .sort_values("create_dt")
  34. .reset_index(drop=True)
  35. )
  36. mask_drop = df_query["price_total"] < entry_price
  37. if mask_drop.any():
  38. first_row = df_query.loc[mask_drop].iloc[0]
  39. price_diff = entry_price - first_row["price_total"]
  40. time_diff_hours = (first_row["create_dt"] - into_update_dt) / pd.Timedelta(hours=1)
  41. df_keep_info_part.at[idx, "price_diff"] = round(float(price_diff), 2)
  42. df_keep_info_part.at[idx, "time_diff_hours"] = round(float(time_diff_hours), 2)
  43. del df_query
  44. count += 1
  45. if count % 5 == 0:
  46. print(f"cal count: {count}")
  47. print(f"计算结束")
  48. client.close()
  49. return df_keep_info_part
  50. def verify_process(min_batch_time_str, max_batch_time_str):
  51. object_dir = "./keep"
  52. output_dir = f"./validate/keep"
  53. os.makedirs(output_dir, exist_ok=True)
  54. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  55. save_scv = f"result_keep_verify_{timestamp_str}.csv"
  56. output_path = os.path.join(output_dir, save_scv)
  57. # 检查目录是否存在
  58. if not os.path.exists(object_dir):
  59. print(f"目录不存在: {object_dir}")
  60. return
  61. # 获取所有以 keep_info_ 开头的 CSV 文件
  62. csv_files = []
  63. for file in os.listdir(object_dir):
  64. if file.startswith("keep_info_") and file.endswith(".csv"):
  65. csv_files.append(file)
  66. if not csv_files:
  67. print(f"在 {object_dir} 中没有找到 keep_info_ 开头的 CSV 文件")
  68. return
  69. csv_files.sort()
  70. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  71. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  72. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  73. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  74. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  75. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  76. return
  77. # 从所有的 keep_info 文件中
  78. for csv_file in csv_files:
  79. batch_time_str = (
  80. csv_file.replace("keep_info_", "").replace(".csv", "")
  81. )
  82. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M")
  83. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  84. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  85. continue
  86. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  87. continue
  88. # 读取 CSV 文件
  89. csv_path = os.path.join(object_dir, csv_file)
  90. try:
  91. df_keep_info = pd.read_csv(csv_path)
  92. except Exception as e:
  93. print(f"read {csv_path} error: {str(e)}")
  94. df_keep_info = pd.DataFrame()
  95. if df_keep_info.empty:
  96. print(f"keep_info数据为空: {csv_file}")
  97. continue
  98. df_keep_info_del = df_keep_info[df_keep_info['keep_flag'] == -1].reset_index(drop=True)
  99. df_keep_info_del['del_batch_time_str'] = batch_time_str
  100. df_keep_info_del = _validate_keep_info_df(df_keep_info_del)
  101. # 根据价格变化情况, 移出时间与验证终点时间的对比, 计算 status_flag 状态
  102. price_diff_num = pd.to_numeric(df_keep_info_del.get("price_diff"), errors="coerce").fillna(0)
  103. del_batch_dt = pd.to_datetime(
  104. df_keep_info_del.get("del_batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  105. )
  106. valid_end_dt = pd.to_datetime(
  107. df_keep_info_del.get("valid_end_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
  108. )
  109. status_flag = pd.Series(0, index=df_keep_info_del.index, dtype="int64") # 其它场景
  110. status_flag.loc[price_diff_num > 0] = 1 # 降价场景
  111. mask_zero = price_diff_num == 0
  112. mask_time_ok = mask_zero & del_batch_dt.notna() & valid_end_dt.notna() & (del_batch_dt >= valid_end_dt)
  113. status_flag.loc[mask_time_ok] = 2 # 超时场景
  114. df_keep_info_del["status_flag"] = status_flag
  115. write_header = not os.path.exists(output_path)
  116. df_keep_info_del.to_csv(output_path, mode="a", header=write_header, index=False, encoding="utf-8-sig")
  117. del df_keep_info_del
  118. print(f"批次:{batch_time_str} 检验结束")
  119. print("检验结束")
  120. print()
  121. def verify_process_2(min_batch_time_str, max_batch_time_str):
  122. object_dir = "/home/node04/descending_cabin_files_uo"
  123. output_dir = f"./validate/keep"
  124. os.makedirs(output_dir, exist_ok=True)
  125. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  126. save_scv = f"result_keep_verify_{timestamp_str}.csv"
  127. output_path = os.path.join(output_dir, save_scv)
  128. # 检查目录是否存在
  129. if not os.path.exists(object_dir):
  130. print(f"目录不存在: {object_dir}")
  131. return
  132. # 获取所有以 keep_info_end_ 开头的 CSV 文件
  133. csv_files = []
  134. for file in os.listdir(object_dir):
  135. if file.startswith("keep_info_end_") and file.endswith(".csv"):
  136. csv_files.append(file)
  137. if not csv_files:
  138. print(f"在 {object_dir} 中没有找到 keep_info_end_ 开头的 CSV 文件")
  139. return
  140. csv_files.sort()
  141. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  142. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  143. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  144. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  145. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  146. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  147. return
  148. list_df = []
  149. # 从所有的 keep_info_end_ 文件中
  150. for csv_file in csv_files:
  151. batch_time_str = csv_file.replace("keep_info_end_", "").replace(".csv", "")
  152. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M%S")
  153. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  154. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  155. continue
  156. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  157. continue
  158. # 读取 CSV 文件
  159. csv_path = os.path.join(object_dir, csv_file)
  160. try:
  161. df_keep_info = pd.read_csv(csv_path)
  162. except Exception as e:
  163. print(f"read {csv_path} error: {str(e)}")
  164. continue
  165. if df_keep_info.empty:
  166. print(f"keep_info数据为空: {csv_file}")
  167. continue
  168. df_keep_info["batch_time_str"] = batch_hour_dt.strftime("%Y%m%d%H%M")
  169. # df_keep_info["src_file"] = csv_file
  170. list_df.append(df_keep_info)
  171. del df_keep_info
  172. if not list_df:
  173. print("时间范围内没有可用 keep_info_end_ 数据")
  174. return
  175. df_keep_all = pd.concat(list_df, ignore_index=True)
  176. del list_df
  177. sort_cols = ["citypair", "flight_numbers", "baggage_weight", "from_date", "into_update_hour"]
  178. df_keep_all = df_keep_all.sort_values(sort_cols, kind="mergesort").reset_index(drop=True)
  179. df_keep_all["gid"] = df_keep_all.groupby(sort_cols, sort=False).ngroup().astype("int64") + 1
  180. client, db = mongo_con_parse(mongo_config)
  181. list_base_row = []
  182. for gid, df_gid in df_keep_all.groupby("gid", sort=False):
  183. city_pair = df_gid["citypair"].iloc[0]
  184. flight_numbers = df_gid["flight_numbers"].iloc[0]
  185. baggage_weight = int(df_gid["baggage_weight"].iloc[0])
  186. from_date = df_gid["from_date"].iloc[0]
  187. into_update_hour = df_gid["into_update_hour"].iloc[0]
  188. valid_end_hour = df_gid["valid_end_hour"].iloc[0]
  189. into_update_dt = pd.to_datetime(
  190. df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
  191. ).min()
  192. batch_dt_series = pd.to_datetime(
  193. df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  194. )
  195. batch_dt = batch_dt_series.max()
  196. entry_price = float("nan")
  197. if batch_dt_series.notna().any():
  198. idx_latest = batch_dt_series.idxmax()
  199. entry_price = pd.to_numeric(df_gid.loc[idx_latest].get("price_total"), errors="coerce")
  200. valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")
  201. flag = 0 # 等待(弹出)标记
  202. if batch_dt >= valid_end_dt:
  203. flag = 2 # 超时标记
  204. if pd.isna(into_update_dt) or pd.isna(batch_dt):
  205. print(f"gid={gid} 时间字段解析失败,跳过")
  206. continue
  207. create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
  208. create_time_end = (batch_dt + pd.Timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")
  209. df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, create_time_end)
  210. df_g1 = df_gid.copy()
  211. df_g2 = df_query.copy()
  212. df_g1["_batch_dt"] = pd.to_datetime(
  213. df_g1.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  214. )
  215. last_price = float(df_g1.iloc[-1]["price_total"])
  216. df_last_price = df_g1[df_g1["price_total"] == last_price]
  217. base_row = df_last_price.iloc[0]
  218. # base_pos = int(df_last_price.index[0])
  219. base_dt = base_row["_batch_dt"]
  220. base_price = float(base_row["price_total"])
  221. drop_create_time = pd.NA
  222. drop_price = pd.NA
  223. price_diff = 0.0
  224. time_diff_hours = 0.0
  225. if not df_g2.empty:
  226. df_g2["create_dt"] = pd.to_datetime(df_g2.get("create_time"), errors="coerce")
  227. mask_drop = df_g2["price_total"] < base_price
  228. if mask_drop.any():
  229. drop_row = df_g2.loc[mask_drop].iloc[0]
  230. drop_create_time = drop_row.get("create_time")
  231. drop_price = float(drop_row["price_total"])
  232. price_diff = round(base_price - drop_price, 2)
  233. time_diff_hours = round(
  234. float((drop_row["create_dt"] - base_dt) / pd.Timedelta(hours=1)),
  235. 2,
  236. )
  237. flag = 1 # 发生降价标记
  238. base_row_cp = base_row.copy()
  239. base_row_cp["end_batch_dt"] = batch_dt
  240. base_row_cp["drop_create_time"] = drop_create_time
  241. base_row_cp["drop_price"] = drop_price
  242. base_row_cp["price_diff"] = price_diff
  243. base_row_cp["time_diff_hours"] = time_diff_hours
  244. base_row_cp["flag"] = flag
  245. list_base_row.append(base_row_cp)
  246. del df_g1
  247. del df_g2
  248. del df_last_price
  249. del df_query
  250. client.close()
  251. df_base = pd.DataFrame(list_base_row)
  252. df_base.to_csv(output_path, header=True, index=False, encoding="utf-8-sig")
  253. print(f"输出: {output_path}")
  254. return
  255. if __name__ == "__main__":
  256. verify_process("202604151100", "202604161400")
  257. # verify_process_2("202604151100", "202604161400")
  258. pass