浏览代码

修改验证脚本

node04 2 周之前
父节点
当前提交
ecbd551b91
共有 2 个文件被更改,包括 27 次插入14 次删除
  1. 1 1
      data_loader.py
  2. 26 13
      result_keep_verify.py

+ 1 - 1
data_loader.py

@@ -577,7 +577,7 @@ def load_data(db_config, city_pair, from_date_begin, from_date_end, is_train=Tru
     return df_all
 
 
-def validate_keep_one_line(db, table_name, city_pair, flight_numbers, baggage_weight, from_date, entry_price, update_hour_str, del_batch_std_str,
+def validate_keep_one_line(db, table_name, city_pair, flight_numbers, baggage_weight, from_date, entry_price, del_batch_std_str,
                            limit=0, max_retries=3, base_sleep=1.0):
     """验证keep_info的一行"""
 

+ 26 - 13
result_keep_verify.py

@@ -243,31 +243,33 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
 
         into_update_dt = pd.to_datetime(
             df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
-        ).min()
+        ).min()  # 进入序列的小时数
         batch_dt_series = pd.to_datetime(
             df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
         )
-        batch_dt = batch_dt_series.max()
+        batch_dt = batch_dt_series.max()  # 离开序列的小时数
 
         entry_price = float("nan")
         if batch_dt_series.notna().any():
             idx_latest = batch_dt_series.idxmax()
-            entry_price = pd.to_numeric(df_gid.loc[idx_latest].get("price_total"), errors="coerce")
+            entry_price = pd.to_numeric(df_gid.loc[idx_latest].get("price_total"), errors="coerce")  # 离开序列时的价格
 
-        valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")
+        valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")   # 距离起飞72小时的节点
 
-        flag = 0   # 等待(弹出)标记
+        flag = 0   # 等待标记
         if batch_dt >= valid_end_dt:
-            flag = 2     # 超时标记      
+            flag = 2     # (距离起飞前72小时)超时标记
+        elif batch_dt < max_batch_dt:
+            flag = 3     # 弹出标记
 
         if pd.isna(into_update_dt) or pd.isna(batch_dt):
             print(f"gid={gid} 时间字段解析失败,跳过")
             continue
 
-        create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
-        create_time_end = (batch_dt + pd.Timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")
+        create_time_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")   # 出序列的那个时间段
+        create_time_end = (batch_dt + pd.Timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S")     # 出序列的那个时间往后延申2小时
 
-        df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, into_update_hour, create_time_end)
+        df_query = validate_keep_one_line(db, mongo_table_uo, city_pair, flight_numbers, baggage_weight, from_date, entry_price, create_time_end)
         
         df_g1 = df_gid.copy()
         df_g2 = df_query.copy()
@@ -280,8 +282,8 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         df_last_price = df_g1[df_g1["price_total"] == last_price]
         base_row = df_last_price.iloc[0]
         # base_pos = int(df_last_price.index[0])
-        base_dt = base_row["_batch_dt"]
-        base_price = float(base_row["price_total"])
+        base_dt = base_row["_batch_dt"]               # 出现最后价格的第一个批次
+        base_price = float(base_row["price_total"])   # 最后价格
 
         drop_create_time = pd.NA
         drop_price = pd.NA
@@ -291,6 +293,7 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         if not df_g2.empty:
             df_g2["create_dt"] = pd.to_datetime(df_g2.get("create_time"), errors="coerce")
             mask_drop = df_g2["price_total"] < base_price
+            # 发生降价的场景
             if mask_drop.any():
                 drop_row = df_g2.loc[mask_drop].iloc[0]
                 drop_create_time = drop_row.get("create_time")
@@ -301,6 +304,9 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
                     2,
                 )
                 flag = 1  # 发生降价标记
+            # 没有发生降价的场景
+            else:
+                pass
         
         base_row_cp = base_row.copy()
         base_row_cp["end_batch_dt"] = batch_dt
@@ -308,6 +314,13 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         base_row_cp["drop_price"] = drop_price
         base_row_cp["price_diff"] = price_diff
         base_row_cp["time_diff_hours"] = time_diff_hours
+        if pd.notna(base_row_cp.get("end_batch_dt")) and pd.notna(base_row_cp.get("_batch_dt")):
+            base_row_cp["time_diff_hours_2"] = round(
+                float((base_row_cp["end_batch_dt"] - base_row_cp["_batch_dt"]) / pd.Timedelta(hours=1)),
+                2,
+            )
+        else:
+            base_row_cp["time_diff_hours_2"] = pd.NA
         base_row_cp["flag"] = flag
         list_base_row.append(base_row_cp)
 
@@ -324,6 +337,6 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
     return
 
 if __name__ == "__main__":
-    verify_process("202604151100", "202604161400")
-    # verify_process_2("202604151100", "202604161400")
+    # verify_process("202604151100", "202604161400")
+    verify_process_2("202604290900", "202604291500")
     pass