result_keep_verify.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. import os
  2. import datetime
  3. import pandas as pd
  4. from data_loader import mongo_con_parse, validate_keep_one_line, fill_hourly_crawl_date
  5. from config import vj_flight_route_list_hot, vj_flight_route_list_nothot, \
  6. CLEAN_VJ_HOT_NEAR_INFO_TAB, CLEAN_VJ_HOT_FAR_INFO_TAB, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  7. def _validate_keep_info_df(df_keep_info_part):
  8. client, db = mongo_con_parse()
  9. count = 0
  10. if "price_diff" not in df_keep_info_part.columns:
  11. df_keep_info_part["price_diff"] = 0
  12. if "time_diff_hours" not in df_keep_info_part.columns:
  13. df_keep_info_part["time_diff_hours"] = 0
  14. for idx, row in df_keep_info_part.iterrows():
  15. df_keep_info_part.at[idx, "price_diff"] = 0
  16. df_keep_info_part.at[idx, "time_diff_hours"] = 0
  17. city_pair = row['city_pair']
  18. flight_day = row['flight_day']
  19. flight_number_1 = row['flight_number_1']
  20. flight_number_2 = row['flight_number_2']
  21. baggage = row['baggage']
  22. # update_hour = row['update_hour']
  23. # update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
  24. into_update_hour = row['into_update_hour']
  25. into_update_dt = pd.to_datetime(into_update_hour, format='%Y-%m-%d %H:%M:%S')
  26. del_batch_time_str = row['del_batch_time_str']
  27. del_batch_dt = pd.to_datetime(del_batch_time_str, format='%Y%m%d%H%M')
  28. del_batch_std_str = del_batch_dt.strftime('%Y-%m-%d %H:%M:%S')
  29. entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
  30. if city_pair in vj_flight_route_list_hot:
  31. table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
  32. table_name_near = CLEAN_VJ_HOT_NEAR_INFO_TAB
  33. elif city_pair in vj_flight_route_list_nothot:
  34. table_name_far = CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  35. table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
  36. # 分别从远期表和近期表里查询
  37. df_query_far = validate_keep_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2,
  38. baggage, into_update_hour, del_batch_std_str)
  39. df_query_near = validate_keep_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2,
  40. baggage, into_update_hour, del_batch_std_str)
  41. # 合并
  42. df_query = pd.concat([df_query_far, df_query_near]).reset_index(drop=True)
  43. if (not df_query.empty) and pd.notna(entry_price):
  44. if ("adult_total_price" in df_query.columns) and ("crawl_date" in df_query.columns):
  45. df_query["adult_total_price"] = pd.to_numeric(df_query["adult_total_price"], errors="coerce")
  46. df_query["crawl_dt"] = pd.to_datetime(df_query["crawl_date"], errors="coerce")
  47. df_query = (
  48. df_query.dropna(subset=["adult_total_price", "crawl_dt"])
  49. .sort_values("crawl_dt")
  50. .reset_index(drop=True)
  51. )
  52. mask_drop = df_query["adult_total_price"] < entry_price
  53. # mask_drop = (df_query["adult_total_price"] < entry_price) & (df_query["crawl_dt"] > update_dt)
  54. if mask_drop.any():
  55. first_row = df_query.loc[mask_drop].iloc[0]
  56. price_diff = entry_price - first_row["adult_total_price"]
  57. time_diff_hours = (first_row["crawl_dt"] - into_update_dt) / pd.Timedelta(hours=1)
  58. df_keep_info_part.at[idx, "price_diff"] = round(float(price_diff), 2)
  59. df_keep_info_part.at[idx, "time_diff_hours"] = round(float(time_diff_hours), 2)
  60. pass
  61. del df_query
  62. del df_query_far
  63. del df_query_near
  64. count += 1
  65. if count % 5 == 0:
  66. print(f"cal count: {count}")
  67. print(f"计算结束")
  68. client.close()
  69. return df_keep_info_part
  70. def verify_process(min_batch_time_str, max_batch_time_str):
  71. object_dir = "./keep_0"
  72. output_dir = f"./validate/keep"
  73. os.makedirs(output_dir, exist_ok=True)
  74. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  75. save_scv = f"result_keep_verify_{timestamp_str}.csv"
  76. output_path = os.path.join(output_dir, save_scv)
  77. # 获取今天的日期
  78. # today_str = pd.Timestamp.now().strftime('%Y-%m-%d')
  79. # 检查目录是否存在
  80. if not os.path.exists(object_dir):
  81. print(f"目录不存在: {object_dir}")
  82. return
  83. # 获取所有以 keep_info_ 开头的 CSV 文件
  84. csv_files = []
  85. for file in os.listdir(object_dir):
  86. if file.startswith("keep_info_") and file.endswith(".csv"):
  87. csv_files.append(file)
  88. if not csv_files:
  89. print(f"在 {object_dir} 中没有找到 keep_info_ 开头的 CSV 文件")
  90. return
  91. csv_files.sort()
  92. # print(csv_files)
  93. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  94. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  95. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  96. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  97. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  98. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  99. return
  100. # 从所有的 keep_info 文件中
  101. for csv_file in csv_files:
  102. batch_time_str = (
  103. csv_file.replace("keep_info_", "").replace(".csv", "")
  104. )
  105. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M")
  106. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  107. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  108. continue
  109. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  110. continue
  111. # 读取 CSV 文件
  112. csv_path = os.path.join(object_dir, csv_file)
  113. try:
  114. df_keep_info = pd.read_csv(csv_path)
  115. except Exception as e:
  116. print(f"read {csv_path} error: {str(e)}")
  117. df_keep_info = pd.DataFrame()
  118. if df_keep_info.empty:
  119. print(f"keep_info数据为空: {csv_file}")
  120. continue
  121. df_keep_info_del = df_keep_info[df_keep_info['keep_flag'] == -1].reset_index(drop=True)
  122. df_keep_info_del['del_batch_time_str'] = batch_time_str
  123. df_keep_info_del = _validate_keep_info_df(df_keep_info_del)
  124. # 根据价格变化情况, 移出时间与验证终点时间的对比, 计算 status_flag 状态
  125. price_diff_num = pd.to_numeric(df_keep_info_del.get("price_diff"), errors="coerce").fillna(0)
  126. del_batch_dt = pd.to_datetime(
  127. df_keep_info_del.get("del_batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  128. )
  129. valid_end_dt = pd.to_datetime(
  130. df_keep_info_del.get("valid_end_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
  131. )
  132. status_flag = pd.Series(0, index=df_keep_info_del.index, dtype="int64") # 默认状态 0
  133. status_flag.loc[price_diff_num > 0] = 1 # 降价状态 1
  134. mask_zero = price_diff_num == 0
  135. mask_time_ok = mask_zero & del_batch_dt.notna() & valid_end_dt.notna() & (del_batch_dt >= valid_end_dt)
  136. status_flag.loc[mask_time_ok] = 2 # 超时状态 2
  137. df_keep_info_del["status_flag"] = status_flag
  138. write_header = not os.path.exists(output_path)
  139. df_keep_info_del.to_csv(output_path, mode="a", header=write_header, index=False, encoding="utf-8-sig")
  140. del df_keep_info_del
  141. print(f"批次:{batch_time_str} 检验结束")
  142. print("检验结束")
  143. print()
  144. def verify_process_2(min_batch_time_str, max_batch_time_str):
  145. object_dir = "/home/node04/descending_cabin_files_vj"
  146. output_dir = f"./validate/keep"
  147. os.makedirs(output_dir, exist_ok=True)
  148. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  149. save_scv = f"result_keep_verify_{timestamp_str}.csv"
  150. output_path = os.path.join(output_dir, save_scv)
  151. # 检查目录是否存在
  152. if not os.path.exists(object_dir):
  153. print(f"目录不存在: {object_dir}")
  154. return
  155. # 获取所有以 keep_info_end_ 开头的 CSV 文件
  156. csv_files = []
  157. for file in os.listdir(object_dir):
  158. if file.startswith("keep_info_end_") and file.endswith(".csv"):
  159. csv_files.append(file)
  160. if not csv_files:
  161. print(f"在 {object_dir} 中没有找到 keep_info_end_ 开头的 CSV 文件")
  162. return
  163. csv_files.sort()
  164. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  165. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  166. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  167. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  168. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  169. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  170. return
  171. list_df = []
  172. # 从所有的 keep_info_end_ 文件中
  173. for csv_file in csv_files:
  174. batch_time_str = csv_file.replace("keep_info_end_", "").replace(".csv", "")
  175. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M%S")
  176. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  177. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  178. continue
  179. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  180. continue
  181. # 读取 CSV 文件
  182. csv_path = os.path.join(object_dir, csv_file)
  183. try:
  184. df_keep_info = pd.read_csv(csv_path)
  185. except Exception as e:
  186. print(f"read {csv_path} error: {str(e)}")
  187. continue
  188. if df_keep_info.empty:
  189. print(f"keep_info数据为空: {csv_file}")
  190. continue
  191. df_keep_info["batch_time_str"] = batch_hour_dt.strftime("%Y%m%d%H%M")
  192. # df_keep_info["src_file"] = csv_file
  193. list_df.append(df_keep_info)
  194. del df_keep_info
  195. if not list_df:
  196. print("时间范围内没有可用 keep_info_end_ 数据")
  197. return
  198. df_keep_all = pd.concat(list_df, ignore_index=True)
  199. del list_df
  200. sort_cols = ["city_pair", "flight_day", "flight_number_1", "flight_number_2", "into_update_hour"]
  201. df_keep_all = df_keep_all.sort_values(sort_cols, kind="mergesort").reset_index(drop=True)
  202. df_keep_all["gid"] = df_keep_all.groupby(sort_cols, sort=False).ngroup().astype("int64") + 1
  203. client, db = mongo_con_parse()
  204. list_base_row = []
  205. for gid, df_gid in df_keep_all.groupby("gid", sort=False):
  206. city_pair = df_gid["city_pair"].iloc[0]
  207. flight_day = df_gid["flight_day"].iloc[0]
  208. flight_number_1 = df_gid["flight_number_1"].iloc[0]
  209. flight_number_2 = df_gid["flight_number_2"].iloc[0]
  210. into_update_hour = df_gid["into_update_hour"].iloc[0]
  211. valid_end_hour = df_gid["valid_end_hour"].iloc[0]
  212. into_update_dt = pd.to_datetime(
  213. df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
  214. ).min()
  215. batch_dt = pd.to_datetime(
  216. df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  217. ).max()
  218. valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")
  219. flag = 0 # 等待(弹出)标记
  220. if batch_dt >= valid_end_dt:
  221. flag = 2 # 超时标记
  222. if pd.isna(into_update_dt) or pd.isna(batch_dt):
  223. print(f"gid={gid} 时间字段解析失败,跳过")
  224. continue
  225. crawl_date_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
  226. crawl_date_end = (batch_dt + pd.Timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
  227. if city_pair in vj_flight_route_list_hot:
  228. table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
  229. table_name_near = CLEAN_VJ_HOT_NEAR_INFO_TAB
  230. elif city_pair in vj_flight_route_list_nothot:
  231. table_name_far = CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  232. table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
  233. else:
  234. print(f"gid={gid} 城市对{city_pair}不在热门/冷门列表,跳过")
  235. continue
  236. baggage = 0
  237. df_query_far = validate_keep_one_line(
  238. db,
  239. table_name_far,
  240. city_pair,
  241. flight_day,
  242. flight_number_1,
  243. flight_number_2,
  244. baggage,
  245. crawl_date_begin,
  246. crawl_date_end,
  247. )
  248. df_query_near = validate_keep_one_line(
  249. db,
  250. table_name_near,
  251. city_pair,
  252. flight_day,
  253. flight_number_1,
  254. flight_number_2,
  255. baggage,
  256. crawl_date_begin,
  257. crawl_date_end,
  258. )
  259. df_query = pd.concat([df_query_far, df_query_near], ignore_index=True)
  260. df_g1 = df_gid.copy()
  261. df_g2 = df_query.copy()
  262. df_g1["_batch_dt"] = pd.to_datetime(
  263. df_g1.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
  264. )
  265. last_price = float(df_g1.iloc[-1]["adult_total_price"])
  266. df_last_price = df_g1[df_g1["adult_total_price"] == last_price]
  267. base_row = df_last_price.iloc[0]
  268. # base_pos = int(df_last_price.index[0])
  269. base_dt = base_row["_batch_dt"]
  270. base_price = float(base_row["adult_total_price"])
  271. # drop_pos = pd.NA
  272. drop_crawl_date = pd.NA
  273. drop_price = pd.NA
  274. price_diff = 0.0
  275. time_diff_hours = 0.0
  276. if not df_g2.empty:
  277. df_g2["crawl_dt"] = pd.to_datetime(df_g2.get("crawl_date"), errors="coerce")
  278. mask_drop = df_g2["adult_total_price"] < base_price
  279. if mask_drop.any():
  280. drop_row = df_g2.loc[mask_drop].iloc[0]
  281. # drop_pos = int(drop_row.name)
  282. drop_crawl_date = drop_row.get("crawl_date")
  283. drop_price = float(drop_row["adult_total_price"])
  284. price_diff = round(base_price - drop_price, 2)
  285. time_diff_hours = round(
  286. float((drop_row["crawl_dt"] - base_dt) / pd.Timedelta(hours=1)),
  287. 2,
  288. )
  289. flag = 1 # 发生降价标记
  290. base_row_cp = base_row.copy()
  291. base_row_cp["end_batch_dt"] = batch_dt
  292. base_row_cp["drop_crawl_date"] = drop_crawl_date
  293. base_row_cp["drop_price"] = drop_price
  294. base_row_cp["price_diff"] = price_diff
  295. base_row_cp["time_diff_hours"] = time_diff_hours
  296. base_row_cp["flag"] = flag
  297. list_base_row.append(base_row_cp)
  298. del df_g1
  299. del df_g2
  300. del df_last_price
  301. del df_query_far
  302. del df_query_near
  303. del df_query
  304. client.close()
  305. df_base = pd.DataFrame(list_base_row)
  306. df_base.to_csv(output_path, header=True, index=False, encoding="utf-8-sig")
  307. print(f"输出: {output_path}")
  308. return
  309. if __name__ == "__main__":
  310. # verify_process("202604071700", "202604090900")
  311. verify_process_2("202604071700", "202604090900")