result_keep_verify.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import os
  2. import datetime
  3. import pandas as pd
  4. from data_loader import mongo_con_parse, validate_keep_one_line, fill_hourly_crawl_date
  5. from config import vj_flight_route_list_hot, vj_flight_route_list_nothot, \
  6. CLEAN_VJ_HOT_NEAR_INFO_TAB, CLEAN_VJ_HOT_FAR_INFO_TAB, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  7. def _validate_keep_info_df(df_keep_info_part):
  8. client, db = mongo_con_parse()
  9. count = 0
  10. if "price_diff" not in df_keep_info_part.columns:
  11. df_keep_info_part["price_diff"] = 0
  12. if "time_diff_hours" not in df_keep_info_part.columns:
  13. df_keep_info_part["time_diff_hours"] = 0
  14. for idx, row in df_keep_info_part.iterrows():
  15. df_keep_info_part.at[idx, "price_diff"] = 0
  16. df_keep_info_part.at[idx, "time_diff_hours"] = 0
  17. city_pair = row['city_pair']
  18. flight_day = row['flight_day']
  19. flight_number_1 = row['flight_number_1']
  20. flight_number_2 = row['flight_number_2']
  21. baggage = row['baggage']
  22. update_hour = row['update_hour']
  23. update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
  24. into_update_hour = row['into_update_hour']
  25. into_update_dt = pd.to_datetime(into_update_hour, format='%Y-%m-%d %H:%M:%S')
  26. entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
  27. if city_pair in vj_flight_route_list_hot:
  28. table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
  29. table_name_near = CLEAN_VJ_HOT_NEAR_INFO_TAB
  30. elif city_pair in vj_flight_route_list_nothot:
  31. table_name_far = CLEAN_VJ_NOTHOT_FAR_INFO_TAB
  32. table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
  33. # 分别从远期表和近期表里查询
  34. df_query_far = validate_keep_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2, baggage, into_update_hour)
  35. df_query_near = validate_keep_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2, baggage, into_update_hour)
  36. # 合并
  37. df_query = pd.concat([df_query_far, df_query_near]).reset_index(drop=True)
  38. if (not df_query.empty) and pd.notna(entry_price):
  39. if ("adult_total_price" in df_query.columns) and ("crawl_date" in df_query.columns):
  40. df_query["adult_total_price"] = pd.to_numeric(df_query["adult_total_price"], errors="coerce")
  41. df_query["crawl_dt"] = pd.to_datetime(df_query["crawl_date"], errors="coerce")
  42. df_query = (
  43. df_query.dropna(subset=["adult_total_price", "crawl_dt"])
  44. .sort_values("crawl_dt")
  45. .reset_index(drop=True)
  46. )
  47. mask_drop = df_query["adult_total_price"] < entry_price
  48. if mask_drop.any():
  49. first_row = df_query.loc[mask_drop].iloc[0]
  50. price_diff = entry_price - first_row["adult_total_price"]
  51. time_diff_hours = (first_row["crawl_dt"] - into_update_dt) / pd.Timedelta(hours=1)
  52. df_keep_info_part.at[idx, "price_diff"] = round(float(price_diff), 2)
  53. df_keep_info_part.at[idx, "time_diff_hours"] = round(float(time_diff_hours), 2)
  54. pass
  55. del df_query
  56. del df_query_far
  57. del df_query_near
  58. count += 1
  59. if count % 5 == 0:
  60. print(f"cal count: {count}")
  61. print(f"计算结束")
  62. client.close()
  63. return df_keep_info_part
  64. def verify_process(min_batch_time_str, max_batch_time_str):
  65. object_dir = "./keep_0"
  66. output_dir = f"./validate/keep"
  67. os.makedirs(output_dir, exist_ok=True)
  68. timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  69. save_scv = f"result_keep_verify_{timestamp_str}.csv"
  70. output_path = os.path.join(output_dir, save_scv)
  71. # 获取今天的日期
  72. # today_str = pd.Timestamp.now().strftime('%Y-%m-%d')
  73. # 检查目录是否存在
  74. if not os.path.exists(object_dir):
  75. print(f"目录不存在: {object_dir}")
  76. return
  77. # 获取所有以 keep_info_ 开头的 CSV 文件
  78. csv_files = []
  79. for file in os.listdir(object_dir):
  80. if file.startswith("keep_info_") and file.endswith(".csv"):
  81. csv_files.append(file)
  82. if not csv_files:
  83. print(f"在 {object_dir} 中没有找到 keep_info_ 开头的 CSV 文件")
  84. return
  85. csv_files.sort()
  86. # print(csv_files)
  87. min_batch_dt = datetime.datetime.strptime(min_batch_time_str, "%Y%m%d%H%M")
  88. min_batch_dt = min_batch_dt.replace(minute=0, second=0, microsecond=0)
  89. max_batch_dt = datetime.datetime.strptime(max_batch_time_str, "%Y%m%d%H%M")
  90. max_batch_dt = max_batch_dt.replace(minute=0, second=0, microsecond=0)
  91. if min_batch_dt is not None and max_batch_dt is not None and min_batch_dt > max_batch_dt:
  92. print(f"时间范围非法: min_batch_time_str({min_batch_time_str}) > max_batch_time_str({max_batch_time_str}),退出")
  93. return
  94. # 从所有的 keep_info 文件中
  95. for csv_file in csv_files:
  96. batch_time_str = (
  97. csv_file.replace("keep_info_", "").replace(".csv", "")
  98. )
  99. batch_dt = datetime.datetime.strptime(batch_time_str, "%Y%m%d%H%M")
  100. batch_hour_dt = batch_dt.replace(minute=0, second=0, microsecond=0)
  101. if min_batch_dt is not None and batch_hour_dt < min_batch_dt:
  102. continue
  103. if max_batch_dt is not None and batch_hour_dt > max_batch_dt:
  104. continue
  105. # 读取 CSV 文件
  106. csv_path = os.path.join(object_dir, csv_file)
  107. try:
  108. df_keep_info = pd.read_csv(csv_path)
  109. except Exception as e:
  110. print(f"read {csv_path} error: {str(e)}")
  111. df_keep_info = pd.DataFrame()
  112. if df_keep_info.empty:
  113. print(f"keep_info数据为空: {csv_file}")
  114. continue
  115. df_keep_info_del = df_keep_info[df_keep_info['keep_flag'] == -1].reset_index(drop=True)
  116. df_keep_info_del = _validate_keep_info_df(df_keep_info_del)
  117. df_keep_info_del['del_batch_time_str'] = batch_time_str
  118. write_header = not os.path.exists(output_path)
  119. df_keep_info_del.to_csv(output_path, mode="a", header=write_header, index=False, encoding="utf-8-sig")
  120. del df_keep_info_del
  121. print(f"批次:{batch_time_str} 检验结束")
  122. print("检验结束")
  123. print()
  124. if __name__ == "__main__":
  125. verify_process("202603121800", "202603131600")
  126. pass