node04 преди 2 седмици
родител
ревизия
18262b531d
променени са 1 файла, в които са добавени 30 реда и са изтрити 13 реда
  1. 30 13
      result_keep_verify.py

+ 30 - 13
result_keep_verify.py

@@ -260,27 +260,29 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         flight_number_1 = df_gid["flight_number_1"].iloc[0]
         flight_number_2 = df_gid["flight_number_2"].iloc[0]
         into_update_hour = df_gid["into_update_hour"].iloc[0]
-        valid_end_hour = df_gid["valid_end_hour"].iloc[0]
+        valid_end_hour = df_gid["valid_end_hour"].iloc[0]   
 
         into_update_dt = pd.to_datetime(
             df_gid.get("into_update_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
-        ).min()
+        ).min()    # 进入序列的小时数 
         batch_dt = pd.to_datetime(
             df_gid.get("batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
-        ).max()
+        ).max()    # 离开序列的小时数
 
-        valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")
+        valid_end_dt = pd.to_datetime(valid_end_hour, format="%Y-%m-%d %H:%M:%S", errors="coerce")  # 距离起飞72小时的节点
         
-        flag = 0   # 等待(弹出)标记
+        flag = 0     # 等待标记
         if batch_dt >= valid_end_dt:
-            flag = 2     # 超时标记      
+            flag = 2     # (距离起飞前72小时)超时标记
+        elif batch_dt < max_batch_dt:
+            flag = 3     # 弹出标记    
 
         if pd.isna(into_update_dt) or pd.isna(batch_dt):
             print(f"gid={gid} 时间字段解析失败,跳过")
             continue
 
-        crawl_date_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")
-        crawl_date_end = (batch_dt + pd.Timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
+        crawl_date_begin = (batch_dt + pd.Timedelta(hours=0)).strftime("%Y-%m-%d %H:%M:%S")  # 出序列的那个时间段
+        crawl_date_end = (batch_dt + pd.Timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")    # 出序列的那个时间段往后延申8小时
 
         if city_pair in vj_flight_route_list_hot:
             table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
@@ -293,6 +295,7 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
             continue
 
         baggage = 0
+        # 查远期表
         df_query_far = validate_keep_one_line(
             db,
             table_name_far,
@@ -304,6 +307,7 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
             crawl_date_begin,
             crawl_date_end,
         )
+        # 查近期表
         df_query_near = validate_keep_one_line(
             db,
             table_name_near,
@@ -328,8 +332,8 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         df_last_price = df_g1[df_g1["adult_total_price"] == last_price]
         base_row = df_last_price.iloc[0]
         # base_pos = int(df_last_price.index[0])
-        base_dt = base_row["_batch_dt"]
-        base_price = float(base_row["adult_total_price"])
+        base_dt = base_row["_batch_dt"]   # 出现最后价格的第一个批次
+        base_price = float(base_row["adult_total_price"])  # 最后价格
        
         # drop_pos = pd.NA
         drop_crawl_date = pd.NA
@@ -340,7 +344,8 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
         if not df_g2.empty:
             df_g2["crawl_dt"] = pd.to_datetime(df_g2.get("crawl_date"), errors="coerce")
             mask_drop = df_g2["adult_total_price"] < base_price
-            if mask_drop.any():
+            # 发生降价的场景
+            if mask_drop.any():  
                 drop_row = df_g2.loc[mask_drop].iloc[0]
                 # drop_pos = int(drop_row.name)
                 drop_crawl_date = drop_row.get("crawl_date")
@@ -351,14 +356,25 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
                     2,
                 )
                 flag = 1  # 发生降价标记
-        
+            # 没有发生降价的场景
+            else: 
+                pass
+
         base_row_cp = base_row.copy()
         base_row_cp["end_batch_dt"] = batch_dt
         base_row_cp["drop_crawl_date"] = drop_crawl_date
         base_row_cp["drop_price"] = drop_price
         base_row_cp["price_diff"] = price_diff
         base_row_cp["time_diff_hours"] = time_diff_hours
+        if pd.notna(base_row_cp.get("end_batch_dt")) and pd.notna(base_row_cp.get("_batch_dt")):
+            base_row_cp["time_diff_hours_2"] = round(
+                float((base_row_cp["end_batch_dt"] - base_row_cp["_batch_dt"]) / pd.Timedelta(hours=1)),
+                2,
+            )
+        else:
+            base_row_cp["time_diff_hours_2"] = pd.NA
         base_row_cp["flag"] = flag
+
         list_base_row.append(base_row_cp)
 
         del df_g1
@@ -377,4 +393,5 @@ def verify_process_2(min_batch_time_str, max_batch_time_str):
 
 if __name__ == "__main__":
     # verify_process("202604071700", "202604090900")
-    verify_process_2("202604171500", "202604200800")
+    # verify_process_2("202604211700", "202604220900")
+    verify_process_2("202604281500", "202604291300")