| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import threading
- import json
- import time
- from my_logger import Logger
- from flights_utils import time_format_conversion
- from flights_redis import RedisHelper
- from flights_mongodb import mongo_con_parse
- from settings import CRAWL_DATE_CONF_STATUS_TAB
- from insert_flights_gk_route import get_airport_data, find_route_airport_pair
- from gk_flights_spider import GKSpider
- from clean_gk_flights_datas import CleanGKDatas
- class GKSpidersQueWork(GKSpider, CleanGKDatas):
- def __init__(self):
- super().__init__()
- self.website = "gk"
- self.que_key = "flight_website_{}".format(self.website)
- self.redis_ = RedisHelper()
- self.db = mongo_con_parse()
- self.info_tab = "flights_{}_info_tab".format(self.website)
- self.clean_info_tab = "clean_{}".format(self.info_tab)
- # self.thread_numbers = 5
- self.thread_numbers = 1
- self.logger = Logger(
- './logs/{}_flights_spiders_que_work.log'.format(self.website),
- 'debug').logger
- def get_clean_1_crawl_date(self):
- website_crawl_date = None
- website_crawl_dates = self.db.get_collection(
- CRAWL_DATE_CONF_STATUS_TAB).distinct(
- "crawl_date",
- {
- "website": self.website,
- "clean_status": 1,
- "crawl_status": 1
- }
- )
- if len(website_crawl_dates) > 0:
- website_crawl_dates.sort()
- website_crawl_date = website_crawl_dates[-1]
- return website_crawl_date
- def thread_task(self, thread_number):
- while True:
- que_data = self.redis_.get_nowait(self.que_key)
- if que_data:
- task_json = json.loads(que_data)
- from_city_code = task_json.get("from_city_code")
- to_city_code = task_json.get("to_city_code")
- search_date = task_json.get("search_date")
- # 查询日期转换
- search_date = time_format_conversion(
- search_date,
- in_strftime_str="%Y%m%d",
- out_strftime_str="%Y-%m-%d"
- )
- self.logger.info(f"正在采集: {search_date}:{from_city_code}-{to_city_code}")
-
- # 采集航班数据
- flights_json = self.get_flights_with_verify_price(from_city_code, to_city_code, search_date, thread_number)
- if flights_json:
- self.logger.info(
- "thread_number_name:{0},获取航班信息成功: {1}:{2}-{3}".format(
- thread_number, search_date, from_city_code,
- to_city_code))
- self.logger.info(
- "thread_number_name:{0}, {1}:{2}-{3}, resp json: {4}".format(
- thread_number, search_date, from_city_code, to_city_code,
- json.dumps(flights_json)[:60])
- )
- # 清洗
- flights_json["website"] = self.website
- # 获取采集日期
- crawl_date = self.get_clean_1_crawl_date()
- flights_json["crawl_date"] = crawl_date
- flights_json["search_from_city_code"] = from_city_code
- flights_json["search_to_city_code"] = to_city_code
- flights_json["search_date"] = search_date
- # 直接进入清洗流程(并开启验价,验价会先删除该航段当天的所有数据,再重新获取)
- self.processing_data_one_search(flights_json, thread_number, True)
- self.logger.info("thread_number_name:{0},保存航班信息结束: {1}:{2}-{3}".format(thread_number, search_date,
- from_city_code,
- to_city_code))
- else:
- self.logger.info("thread_number:{0},获取航班信息{1}:{2}-{3}失败".format(
- thread_number, search_date, from_city_code, to_city_code))
- time.sleep(0.5)
- def run_threading(self):
- for thread_number in range(1, self.thread_numbers+1):
- t = threading.Thread(
- target=self.thread_task,
- args=(thread_number,)
- )
- t.start()
- self.task_queue.join()
- if __name__ == "__main__":
- R = GKSpidersQueWork()
- R.run_threading()
|