import threading import json import time from my_logger import Logger from flights_utils import time_format_conversion from flights_redis import RedisHelper from flights_mongodb import mongo_con_parse from settings import CRAWL_DATE_CONF_STATUS_TAB from insert_flights_gk_route import get_airport_data, find_route_airport_pair from gk_flights_spider import GKSpider from clean_gk_flights_datas import CleanGKDatas class GKSpidersQueWork(GKSpider, CleanGKDatas): def __init__(self): super().__init__() self.website = "gk" self.que_key = "flight_website_{}".format(self.website) self.redis_ = RedisHelper() self.db = mongo_con_parse() self.info_tab = "flights_{}_info_tab".format(self.website) self.clean_info_tab = "clean_{}".format(self.info_tab) # self.thread_numbers = 5 self.thread_numbers = 1 self.logger = Logger( './logs/{}_flights_spiders_que_work.log'.format(self.website), 'debug').logger def get_clean_1_crawl_date(self): website_crawl_date = None website_crawl_dates = self.db.get_collection( CRAWL_DATE_CONF_STATUS_TAB).distinct( "crawl_date", { "website": self.website, "clean_status": 1, "crawl_status": 1 } ) if len(website_crawl_dates) > 0: website_crawl_dates.sort() website_crawl_date = website_crawl_dates[-1] return website_crawl_date def thread_task(self, thread_number): while True: que_data = self.redis_.get_nowait(self.que_key) if que_data: task_json = json.loads(que_data) from_city_code = task_json.get("from_city_code") to_city_code = task_json.get("to_city_code") search_date = task_json.get("search_date") # 查询日期转换 search_date = time_format_conversion( search_date, in_strftime_str="%Y%m%d", out_strftime_str="%Y-%m-%d" ) self.logger.info(f"正在采集: {search_date}:{from_city_code}-{to_city_code}") # 采集航班数据 flights_json = self.get_flights_with_verify_price(from_city_code, to_city_code, search_date, thread_number) if flights_json: self.logger.info( "thread_number_name:{0},获取航班信息成功: {1}:{2}-{3}".format( thread_number, search_date, from_city_code, to_city_code)) self.logger.info( "thread_number_name:{0}, {1}:{2}-{3}, resp json: {4}".format( thread_number, search_date, from_city_code, to_city_code, json.dumps(flights_json)[:60]) ) # 清洗 flights_json["website"] = self.website # 获取采集日期 crawl_date = self.get_clean_1_crawl_date() flights_json["crawl_date"] = crawl_date flights_json["search_from_city_code"] = from_city_code flights_json["search_to_city_code"] = to_city_code flights_json["search_date"] = search_date # 直接进入清洗流程(并开启验价,验价会先删除该航段当天的所有数据,再重新获取) self.processing_data_one_search(flights_json, thread_number, True) self.logger.info("thread_number_name:{0},保存航班信息结束: {1}:{2}-{3}".format(thread_number, search_date, from_city_code, to_city_code)) else: self.logger.info("thread_number:{0},获取航班信息{1}:{2}-{3}失败".format( thread_number, search_date, from_city_code, to_city_code)) time.sleep(0.5) def run_threading(self): for thread_number in range(1, self.thread_numbers+1): t = threading.Thread( target=self.thread_task, args=(thread_number,) ) t.start() self.task_queue.join() if __name__ == "__main__": R = GKSpidersQueWork() R.run_threading()