Browse Source

提交近期修改 改进判定方案与验证方式

node04 1 tuần trước cách đây
mục cha
commit
7091cdd4fa
6 tập tin đã thay đổi với 74 bổ sung175 xóa
  1. 12 10
      data_loader.py
  2. 15 152
      data_preprocess.py
  3. 2 1
      evaluate_validate_pnl.py
  4. 1 1
      main_tr_0.py
  5. 26 6
      result_keep_verify.py
  6. 18 5
      result_validate_0.py

+ 12 - 10
data_loader.py

@@ -898,23 +898,25 @@ def query_all_flight_number(db, table_name):
     return list_flight_number
 
 
-def validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour, 
+def validate_one_line(db, table_name, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour, 
                       limit=0, max_retries=3, base_sleep=1.0):
     """验证预测结果的一行"""
     
-    if city_pair in vj_flight_route_list_hot:
-        table_name = CLEAN_VJ_HOT_NEAR_INFO_TAB
-    elif city_pair in vj_flight_route_list_nothot: 
-        table_name = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
-    else:
-        print(f"城市对{city_pair}不在热门航线与冷门航线, 返回")
-        return pd.DataFrame()
+    # if city_pair in vj_flight_route_list_hot:
+    #     table_name = CLEAN_VJ_HOT_NEAR_INFO_TAB
+    # elif city_pair in vj_flight_route_list_nothot: 
+    #     table_name = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
+    # else:
+    #     print(f"城市对{city_pair}不在热门航线与冷门航线, 返回")
+    #     return pd.DataFrame()
         
     city_pair_split = city_pair.split('-')
     from_city_code = city_pair_split[0]
     to_city_code = city_pair_split[1]
     flight_day_str = datetime.strptime(flight_day, "%Y-%m-%d").strftime("%Y%m%d") 
     baggage_str = f"1-{baggage}"
+    if baggage == 0:
+        baggage_str = "-;-;-;-"
 
     for attempt in range(1, max_retries + 1):
         try:
@@ -998,7 +1000,7 @@ def validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_
             time.sleep(sleep_time)
 
 
-def validate_keep_one_line(db, table_name, city_pair, flight_day, flight_number_1, flight_number_2, baggage, update_hour_str,
+def validate_keep_one_line(db, table_name, city_pair, flight_day, flight_number_1, flight_number_2, baggage, update_hour_str, del_batch_std_str,
                            limit=0, max_retries=3, base_sleep=1.0):
     """验证keep_info的一行"""
     city_pair_split = city_pair.split('-')
@@ -1018,7 +1020,7 @@ def validate_keep_one_line(db, table_name, city_pair, flight_day, flight_number_
                 "to_city_code": to_city_code,
                 "search_dep_time": flight_day_str,
                 "segments.baggage": baggage_str,
-                "crawl_date": {"$gte": update_hour_str},
+                "crawl_date": {"$gte": update_hour_str, "$lt": del_batch_std_str},
                 "segments.0.flight_number": flight_number_1,
             }
             # 如果有第二段

+ 15 - 152
data_preprocess.py

@@ -1244,6 +1244,7 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
         # seats_remaining_change_amount = row['seats_remaining_change_amount']
         price_amount = row['adult_total_price']
         seats_remaining = row['seats_remaining']
+        # envelope_position = row['envelope_position']
 
         length_drop = 0
         length_rise = 0
@@ -1287,8 +1288,10 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 df_drop_gap['price_gap'] = high_price_vals - price_base
                 df_drop_gap['price_abs_gap'] = df_drop_gap['price_gap'].abs()
 
-                df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
-                df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy()
+                # df_drop_gap = df_drop_gap.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
+                # df_match = df_drop_gap[(df_drop_gap['pct_abs_gap'] <= pct_threshold) & (df_drop_gap['price_abs_gap'] <= 10.0)].copy()
+                df_drop_gap = df_drop_gap.sort_values(['price_abs_gap'], ascending=[True])
+                df_match = df_drop_gap[(df_drop_gap['price_abs_gap'] <= 5.0)].copy()
 
                 # 历史上出现的极近似的增长幅度后的降价场景
                 if not df_match.empty:
@@ -1350,34 +1353,6 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 # 历史上未出现的极近似的增长幅度后的降价场景
                 else:
                     pass
-                    # if pd.notna(price_duration_hours) and price_change_percent >= 0.1:
-                    #     pct_vals = pd.to_numeric(
-                    #         df_drop_nodes_part['high_price_change_percent'],
-                    #         errors='coerce'
-                    #     ).replace([np.inf, -np.inf], np.nan).dropna()
-                    #     dur_vals = pd.to_numeric(
-                    #         df_drop_nodes_part['high_price_duration_hours'],
-                    #         errors='coerce'
-                    #     ).replace([np.inf, -np.inf], np.nan).dropna()
-
-                    #     if not pct_vals.empty and not dur_vals.empty:
-                    #         pct_min = float(pct_vals.min())
-                    #         pct_max = float(pct_vals.max())
-                    #         dur_min = float(dur_vals.min())
-                    #         dur_max = float(dur_vals.max())
-
-                    #         if (pct_min <= float(price_change_percent) <= pct_max) and (dur_min <= float(price_duration_hours) <= dur_max):
-                    #             df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.5'
-                    #             continue  # 已经判定降价 后面不再做
-                    #         elif (pct_min <= float(price_change_percent)) and (dur_min <= float(price_duration_hours)):
-                    #             df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.3
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = '0h->0.3'
-                    #             continue  # 已经判定降价 后面不再做
                             
         # 针对历史上发生的 连续涨价 
         if not df_rise_nodes.empty:
@@ -1395,76 +1370,6 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 ]
 
             if not df_rise_nodes_part.empty and pd.notna(price_change_percent):
-                # pct_vals_1 = df_keep_nodes_part['keep_price_change_percent'].replace([np.inf, -np.inf], np.nan).dropna()
-                # # 保留百分位 10% ~ 90% 之间的 数据
-                # if not pct_vals_1.empty:
-                #     q10_1 = float(pct_vals_1.quantile(0.10))
-                #     q90_1 = float(pct_vals_1.quantile(0.90))
-                #     df_keep_nodes_part = df_keep_nodes_part[
-                #         df_keep_nodes_part['keep_price_change_percent'].between(q10_1, q90_1)
-                #     ]
-                # if df_keep_nodes_part.empty:
-                #     continue
-                
-                # 特殊判定场景
-                # if price_change_percent < 0:
-
-                #     df_tmp = df_keep_nodes_part.copy()
-                #     # 确保组内顺序正确(如果前面已经排过,这行可省略)
-                #     df_tmp = df_tmp.sort_values(
-                #         by=["flight_day", "keep_hours_until_departure"],
-                #         ascending=[True, False]
-                #     )
-                #     # 是否为负值
-                #     df_tmp["is_negative"] = df_tmp["keep_price_change_percent"] < 0
-                    
-                #     if df_tmp["is_negative"].any():
-                #         # 标记“负值段”的开始
-                #         # 当 is_negative 为 True 且 前一行不是负值时,认为是一个新段
-                #         df_tmp["neg_block_id"] = (
-                #             df_tmp["is_negative"]
-                #             & ~df_tmp.groupby("flight_day")["is_negative"].shift(fill_value=False)
-                #         ).groupby(df_tmp["flight_day"]).cumsum()
-                #         # 在每个负值段内计数(第几个负值)
-                #         df_tmp["neg_rank_in_block"] = (
-                #             df_tmp.groupby(["flight_day", "neg_block_id"])
-                #             .cumcount() + 1
-                #         )
-                #         # 每个连续负值段的长度
-                #         df_tmp["neg_block_size"] = (
-                #             df_tmp.groupby(["flight_day", "neg_block_id"])["is_negative"]
-                #             .transform("sum")
-                #         )
-                #         # 只保留:
-                #         # 1) 是负值
-                #         # 2) 且不是该连续负值段的最后一个
-                #         df_continuous_price_drop = df_tmp[
-                #             (df_tmp["is_negative"]) &
-                #             (df_tmp["neg_rank_in_block"] < df_tmp["neg_block_size"])
-                #         ].drop(
-                #             columns=[
-                #                 "is_negative",
-                #                 "neg_block_id",
-                #                 "neg_rank_in_block",
-                #                 "neg_block_size",
-                #             ]
-                #         )
-                #         pct_diff_c = (df_continuous_price_drop['keep_price_change_percent'] - float(price_change_percent)).abs()
-                #         df_match_c = df_continuous_price_drop.loc[pct_diff_c <= pct_threshold_c, ['flight_day', 'keep_hours_until_departure', 'keep_price_duration_hours', 'keep_price_change_percent']].copy()
-
-                #         # 符合连续降价条件
-                #         if not df_match_c.empty and pd.notna(price_duration_hours):
-                #             vals_c = df_match_c['keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
-                #             if not vals_c.empty:
-                #                 min_val_c = vals_c.min()
-                #                 if min_val_c <= float(price_duration_hours):
-                #                     df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
-                #                     df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                #                     df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
-                #                     df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'c1'
-                #                     length_drop = df_match_c.shape[0]        
-                #                     # continue   # 已经判定降价 后面不再做
-
                 # 一般判定场景
                 pct_base_1 = float(price_change_percent)
                 pct_vals_1 = pd.to_numeric(df_rise_nodes_part['prev_rise_change_percent'], errors='coerce')
@@ -1482,8 +1387,10 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                 df_rise_gap_1['price_gap'] = rise_price_vals_1 - price_base_1
                 df_rise_gap_1['price_abs_gap'] = df_rise_gap_1['price_gap'].abs()
 
-                df_rise_gap_1 = df_rise_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
-                df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_rise_gap_1['price_abs_gap'] <= 10.0)].copy()
+                # df_rise_gap_1 = df_rise_gap_1.sort_values(['pct_abs_gap', 'price_abs_gap'], ascending=[True, True])
+                # df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['pct_abs_gap'] <= pct_threshold_1) & (df_rise_gap_1['price_abs_gap'] <= 10.0)].copy()
+                df_rise_gap_1 = df_rise_gap_1.sort_values(['price_abs_gap'], ascending=[True])
+                df_match_1 = df_rise_gap_1.loc[(df_rise_gap_1['price_abs_gap'] <= 5.0)].copy()
 
                 # 历史上出现过近似变化幅度后继续涨价场景
                 if not df_match_1.empty:
@@ -1547,59 +1454,15 @@ def predict_data_simple(df_input, group_route_str, output_dir, predict_dir=".",
                                 # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
                                 df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = drop_prob
                                 
-                            # elif length_keep == length_drop:   # 不降价与降价相同, 取0.5概率
-
-                            #     df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
-                            #     df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                            #     df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.5
-                            #     df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
-                        
-                                # df_match_1['hours_delta'] = hours_until_departure - df_match_1['keep_hours_until_departure']
-                                # df_match_1['modify_keep_price_duration_hours'] = df_match_1['keep_price_duration_hours'] - df_match_1['hours_delta']
-                                # df_match_1 = df_match_1[df_match_1['modify_keep_price_duration_hours'] > 0]
-
-                                # 比较 price_duration_hours 在 modify_keep_price_duration_hours 的百分位                    
-                                # vals = df_match_1['modify_keep_price_duration_hours'].replace([np.inf, -np.inf], np.nan).dropna()
-                                # if not vals.empty:
-                                #     # q10_11 = float(vals.quantile(0.10))
-                                #     min_val = vals.min()
-                                #     if min_val <= float(price_duration_hours):
-                                #         df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
-                                #         df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                                #         df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
-                                #         df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'k1'
-
                 # 历史上未出现过近似变化幅度后保持低价场景
                 else:
                     pass
-                    # df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
-                    # df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                    # df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
-                    # df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n0'
-
-                    # if pd.notna(price_duration_hours) and price_change_percent <= 0.1:
-                    #     df_keep_nodes_part_1 = df_keep_nodes_part[df_keep_nodes_part['keep_price_change_percent'] <= 0.1]
-                    #     pct_vals_1 = pd.to_numeric(
-                    #         df_keep_nodes_part_1['keep_price_change_percent'],
-                    #         errors='coerce'
-                    #     ).replace([np.inf, -np.inf], np.nan).dropna()
-                    #     dur_vals_1 = pd.to_numeric(
-                    #         df_keep_nodes_part_1['keep_price_duration_hours'],
-                    #         errors='coerce'
-                    #     ).replace([np.inf, -np.inf], np.nan).dropna()
-
-                    #     if not pct_vals_1.empty and not dur_vals_1.empty:
-                    #         pct_min_1 = float(pct_vals_1.min())
-                    #         pct_max_1 = float(pct_vals_1.max())
-                    #         dur_min_1 = float(dur_vals_1.min())
-                    #         dur_max_1 = float(dur_vals_1.max())
-
-                    #         if (pct_min_1 <= float(price_change_percent) <= pct_max_1) and (dur_min_1 <= float(price_duration_hours) <= dur_max_1):
-                    #             df_min_hours.loc[idx, 'simple_will_price_drop'] = 0
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours'] = 0
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 0.0
-                    #             df_min_hours.loc[idx, 'simple_drop_in_hours_dist'] = 'n1'
-                pass
+        # 根据价格包络位置统一判定
+        # if envelope_position >= 0.97:  # 在0.97分位之上的认为必降价?
+        #     df_min_hours.loc[idx, 'simple_will_price_drop'] = 1
+        #     df_min_hours.loc[idx, 'flag_dist'] = 'dd'
+        #     df_min_hours.loc[idx, 'simple_drop_in_hours_prob'] = 1
+
     print("判定循环结束")
     # 按航班号统一其降价/涨价的上限与下限, 上限统一取最大, 下限统一取最小
     # _grp_cols = ['city_pair', 'flight_number_1', 'flight_number_2']

+ 2 - 1
evaluate_validate_pnl.py

@@ -117,8 +117,9 @@ if __name__ == "__main__":
         sys.argv = [
             sys.argv[0],
             # "/home/node04/yuzhou/jiangcang_vj/validate/node0205_zong/result_validate_node0205_zong_20260211100622.csv",  # 替换为实际路径
-            "/home/node04/yuzhou/jiangcang_vj/validate/node0211_zong/result_validate_node0211_zong_20260302104535.csv",  # 替换为实际路径
+            # "/home/node04/yuzhou/jiangcang_vj/validate/node0211_zong/result_validate_node0211_zong_20260302104535.csv",  # 替换为实际路径
             # "/home/node04/yuzhou/jiangcang_vj/validate/node0224_zong/result_validate_node0224_zong_20260227100242.csv",  # 替换为实际路径
+            "/home/node04/yuzhou/jiangcang_vj/validate/node0311_zong/result_validate_node0311_zong_20260316095704.csv",  # 替换为实际路径
             # "--output", "debug_output.csv"
         ]
 

+ 1 - 1
main_tr_0.py

@@ -50,7 +50,7 @@ def start_train():
     # date_end = datetime.today().strftime("%Y-%m-%d")
     date_end = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
     # date_begin = (datetime.today() - timedelta(days=32)).strftime("%Y-%m-%d")
-    date_begin = "2026-01-01"   # 2025-12-01 2026-02-11 2026-02-24 2026-03-02
+    date_begin = "2026-03-11"   # 2026-01-01 2026-03-11 2026-03-16
 
     print(f"训练时间范围: {date_begin} 到 {date_end}")
 

+ 26 - 6
result_keep_verify.py

@@ -24,10 +24,13 @@ def _validate_keep_info_df(df_keep_info_part):
         flight_number_1 = row['flight_number_1']
         flight_number_2 = row['flight_number_2']
         baggage = row['baggage']
-        update_hour = row['update_hour']
-        update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
+        # update_hour = row['update_hour']
+        # update_dt = pd.to_datetime(update_hour, format='%Y-%m-%d %H:%M:%S')
         into_update_hour = row['into_update_hour']
         into_update_dt = pd.to_datetime(into_update_hour, format='%Y-%m-%d %H:%M:%S')
+        del_batch_time_str = row['del_batch_time_str']
+        del_batch_dt = pd.to_datetime(del_batch_time_str, format='%Y%m%d%H%M')
+        del_batch_std_str = del_batch_dt.strftime('%Y-%m-%d %H:%M:%S')
 
         entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
         if city_pair in vj_flight_route_list_hot:
@@ -38,8 +41,10 @@ def _validate_keep_info_df(df_keep_info_part):
             table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
 
         # 分别从远期表和近期表里查询
-        df_query_far = validate_keep_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2, baggage, into_update_hour)
-        df_query_near = validate_keep_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2, baggage, into_update_hour)
+        df_query_far = validate_keep_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2, 
+                                              baggage, into_update_hour, del_batch_std_str)
+        df_query_near = validate_keep_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2, 
+                                               baggage, into_update_hour, del_batch_std_str)
         # 合并
         df_query = pd.concat([df_query_far, df_query_near]).reset_index(drop=True)
         if (not df_query.empty) and pd.notna(entry_price):
@@ -140,9 +145,24 @@ def verify_process(min_batch_time_str, max_batch_time_str):
             continue
         
         df_keep_info_del = df_keep_info[df_keep_info['keep_flag'] == -1].reset_index(drop=True)
-        df_keep_info_del = _validate_keep_info_df(df_keep_info_del)
         df_keep_info_del['del_batch_time_str'] = batch_time_str
+        df_keep_info_del = _validate_keep_info_df(df_keep_info_del)
 
+        # 根据价格变化情况, 移出时间与验证终点时间的对比, 计算 status_flag 状态
+        price_diff_num = pd.to_numeric(df_keep_info_del.get("price_diff"), errors="coerce").fillna(0)
+        del_batch_dt = pd.to_datetime(
+            df_keep_info_del.get("del_batch_time_str"), format="%Y%m%d%H%M", errors="coerce"
+        )
+        valid_end_dt = pd.to_datetime(
+            df_keep_info_del.get("valid_end_hour"), format="%Y-%m-%d %H:%M:%S", errors="coerce"
+        )
+        status_flag = pd.Series(2, index=df_keep_info_del.index, dtype="int64")
+        status_flag.loc[price_diff_num > 0] = 1
+        mask_zero = price_diff_num == 0
+        mask_time_ok = mask_zero & del_batch_dt.notna() & valid_end_dt.notna() & (del_batch_dt >= valid_end_dt)
+        status_flag.loc[mask_time_ok] = 0
+        df_keep_info_del["status_flag"] = status_flag
+        
         write_header = not os.path.exists(output_path)
         df_keep_info_del.to_csv(output_path, mode="a", header=write_header, index=False, encoding="utf-8-sig")
         del df_keep_info_del
@@ -153,5 +173,5 @@ def verify_process(min_batch_time_str, max_batch_time_str):
         
 
 if __name__ == "__main__":
-    verify_process("202603121800", "202603131600")
+    verify_process("202603121800", "202603160800")
     pass

+ 18 - 5
result_validate_0.py

@@ -3,6 +3,8 @@ import datetime
 import os
 import pandas as pd
 from data_loader import mongo_con_parse, validate_one_line, fill_hourly_crawl_date
+from config import vj_flight_route_list_hot, vj_flight_route_list_nothot, \
+    CLEAN_VJ_HOT_NEAR_INFO_TAB, CLEAN_VJ_HOT_FAR_INFO_TAB, CLEAN_VJ_NOTHOT_NEAR_INFO_TAB, CLEAN_VJ_NOTHOT_FAR_INFO_TAB
 
 
 def _validate_predict_df(df_predict):
@@ -26,7 +28,18 @@ def _validate_predict_df(df_predict):
             valid_begin_dt,
             update_dt
         ).strftime('%Y-%m-%d %H:%M:%S')
-        df_val= validate_one_line(db, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
+
+        if city_pair in vj_flight_route_list_hot:
+            table_name_far = CLEAN_VJ_HOT_FAR_INFO_TAB
+            table_name_near = CLEAN_VJ_HOT_NEAR_INFO_TAB
+        elif city_pair in vj_flight_route_list_nothot: 
+            table_name_far = CLEAN_VJ_NOTHOT_FAR_INFO_TAB
+            table_name_near = CLEAN_VJ_NOTHOT_NEAR_INFO_TAB
+
+        df_val_far = validate_one_line(db, table_name_far, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
+        df_val_near = validate_one_line(db, table_name_near, city_pair, flight_day, flight_number_1, flight_number_2, baggage, valid_begin_hour_modify)
+        # 合并
+        df_val = pd.concat([df_val_far, df_val_near]).reset_index(drop=True)
         
         entry_price = pd.to_numeric(row.get('adult_total_price'), errors='coerce')
         crawl_dt = pd.to_datetime(row.get('crawl_date'), errors='coerce')
@@ -587,12 +600,12 @@ if __name__ == "__main__":
         # validate_process_zong(node, True, "202602041100", "202602051400")  # 有条件汇总
         # node = "node0205"
         # validate_process_zong(node, True, "202602061000", "202602091000")  # 有条件汇总
-        node = "node0211"
-        validate_process_zong(node, True, "202602111100", None)    # 202602111100 -> 202602161000  202602161100 -> 202602211000
+        # node = "node0211"
+        # validate_process_zong(node, True, "202602111100", None)    # 202602111100 -> 202602161000  202602161100 -> 202602211000
         # node = "node0224"
         # validate_process_zong(node, True, "202602241600", None)  # 202602241600 -> 202602271400  202602271500  202602281700
-        # node = "node0302"
-        # validate_process_zong(node, True, "202603021500", None)
+        node = "node0311"
+        validate_process_zong(node, True, "202603121800", None)
     # 1 自动验证
     else:
         node = "node0127"