gk_flights_spider_que_work.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import threading
  2. import json
  3. import time
  4. from my_logger import Logger
  5. from flights_utils import time_format_conversion
  6. from flights_redis import RedisHelper
  7. from flights_mongodb import mongo_con_parse
  8. from settings import CRAWL_DATE_CONF_STATUS_TAB
  9. from insert_flights_gk_route import get_airport_data, find_route_airport_pair
  10. from gk_flights_spider import GKSpider
  11. from clean_gk_flights_datas import CleanGKDatas
  12. class GKSpidersQueWork(GKSpider, CleanGKDatas):
  13. def __init__(self):
  14. super().__init__()
  15. self.website = "gk"
  16. self.que_key = "flight_website_{}".format(self.website)
  17. self.redis_ = RedisHelper()
  18. self.db = mongo_con_parse()
  19. self.info_tab = "flights_{}_info_tab".format(self.website)
  20. self.clean_info_tab = "clean_{}".format(self.info_tab)
  21. # self.thread_numbers = 5
  22. self.thread_numbers = 1
  23. self.logger = Logger(
  24. './logs/{}_flights_spiders_que_work.log'.format(self.website),
  25. 'debug').logger
  26. def get_clean_1_crawl_date(self):
  27. website_crawl_date = None
  28. website_crawl_dates = self.db.get_collection(
  29. CRAWL_DATE_CONF_STATUS_TAB).distinct(
  30. "crawl_date",
  31. {
  32. "website": self.website,
  33. "clean_status": 1,
  34. "crawl_status": 1
  35. }
  36. )
  37. if len(website_crawl_dates) > 0:
  38. website_crawl_dates.sort()
  39. website_crawl_date = website_crawl_dates[-1]
  40. return website_crawl_date
  41. def thread_task(self, thread_number):
  42. while True:
  43. que_data = self.redis_.get_nowait(self.que_key)
  44. if que_data:
  45. task_json = json.loads(que_data)
  46. from_city_code = task_json.get("from_city_code")
  47. to_city_code = task_json.get("to_city_code")
  48. search_date = task_json.get("search_date")
  49. # 查询日期转换
  50. search_date = time_format_conversion(
  51. search_date,
  52. in_strftime_str="%Y%m%d",
  53. out_strftime_str="%Y-%m-%d"
  54. )
  55. self.logger.info(f"正在采集: {search_date}:{from_city_code}-{to_city_code}")
  56. # 采集航班数据
  57. flights_json = self.get_flights_with_verify_price(from_city_code, to_city_code, search_date, thread_number)
  58. if flights_json:
  59. self.logger.info(
  60. "thread_number_name:{0},获取航班信息成功: {1}:{2}-{3}".format(
  61. thread_number, search_date, from_city_code,
  62. to_city_code))
  63. self.logger.info(
  64. "thread_number_name:{0}, {1}:{2}-{3}, resp json: {4}".format(
  65. thread_number, search_date, from_city_code, to_city_code,
  66. json.dumps(flights_json)[:60])
  67. )
  68. # 清洗
  69. flights_json["website"] = self.website
  70. # 获取采集日期
  71. crawl_date = self.get_clean_1_crawl_date()
  72. flights_json["crawl_date"] = crawl_date
  73. flights_json["search_from_city_code"] = from_city_code
  74. flights_json["search_to_city_code"] = to_city_code
  75. flights_json["search_date"] = search_date
  76. # 直接进入清洗流程(并开启验价,验价会先删除该航段当天的所有数据,再重新获取)
  77. self.processing_data_one_search(flights_json, thread_number, True)
  78. self.logger.info("thread_number_name:{0},保存航班信息结束: {1}:{2}-{3}".format(thread_number, search_date,
  79. from_city_code,
  80. to_city_code))
  81. else:
  82. self.logger.info("thread_number:{0},获取航班信息{1}:{2}-{3}失败".format(
  83. thread_number, search_date, from_city_code, to_city_code))
  84. time.sleep(0.5)
  85. def run_threading(self):
  86. for thread_number in range(1, self.thread_numbers+1):
  87. t = threading.Thread(
  88. target=self.thread_task,
  89. args=(thread_number,)
  90. )
  91. t.start()
  92. self.task_queue.join()
  93. if __name__ == "__main__":
  94. R = GKSpidersQueWork()
  95. R.run_threading()