processor.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // services/processor.go
  2. package services
  3. import (
  4. "context"
  5. "go-policy-service/config"
  6. "go-policy-service/models"
  7. "go-policy-service/utils"
  8. "math"
  9. "sync"
  10. "time"
  11. )
  12. var HgConfig *models.HgConfig
  13. func Initialize(cfg *config.Config) {
  14. // 初始化数据库连接
  15. if err := utils.InitDB(cfg); err != nil {
  16. utils.Logger.Fatal("Failed to connect database: ", err)
  17. }
  18. // 初始化Redis
  19. err := utils.InitRedis(cfg)
  20. if err != nil {
  21. utils.Logger.Fatal(err)
  22. }
  23. // 获取航管配置(添加上下文和错误处理)
  24. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  25. defer cancel()
  26. // 获取航管账号配置
  27. HgConfig, err = models.GetHGConf(ctx)
  28. if err != nil {
  29. utils.Logger.Fatal("获取航管配置失败: ", err)
  30. }
  31. // 获取航管服务
  32. // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
  33. }
  34. type TaskProcessor struct {
  35. httpClient utils.HTTPClient
  36. cfg *config.Config // 添加配置字段
  37. hgService *HangguanService
  38. }
  39. func NewTaskProcessor(
  40. httpClient utils.HTTPClient,
  41. cfg *config.Config,
  42. ) *TaskProcessor {
  43. return &TaskProcessor{
  44. httpClient: httpClient,
  45. cfg: cfg,
  46. }
  47. }
  48. func (p *TaskProcessor) Run(cfg *config.Config) {
  49. // 启动Token刷新任务
  50. tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
  51. // 初始化HangguanService
  52. p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
  53. interval, err := time.ParseDuration(cfg.Interval)
  54. if err != nil {
  55. utils.Logger.Fatal("无效的时间间隔配置: ", err)
  56. }
  57. for {
  58. utils.Logger.Info("开始任务处理循环...")
  59. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
  60. if err := p.ProcessTasks(ctx); err != nil {
  61. utils.Logger.Error("任务处理失败: ", err)
  62. }
  63. cancel()
  64. utils.Logger.Infof("任务处理完成,等待 %v 后进行下一轮处理", interval)
  65. time.Sleep(interval)
  66. }
  67. }
  68. // ProcessTasks方法改进
  69. func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
  70. tasks, err := models.GetPendingTasks(ctx, 0) // 每次取所有任务
  71. if err != nil {
  72. return err
  73. }
  74. taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
  75. var wg sync.WaitGroup
  76. // 动态调整worker数量
  77. workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
  78. for i := 0; i < workerCount; i++ {
  79. wg.Add(1)
  80. go func() {
  81. defer wg.Done()
  82. p.worker(ctx, taskChan)
  83. }()
  84. }
  85. // 分发任务优化
  86. DISTRIBUTE:
  87. for _, task := range tasks {
  88. select {
  89. case taskChan <- task:
  90. case <-ctx.Done():
  91. break DISTRIBUTE
  92. }
  93. }
  94. close(taskChan)
  95. wg.Wait()
  96. return nil
  97. }
  98. func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
  99. for task := range tasks {
  100. select {
  101. case <-ctx.Done():
  102. return
  103. default:
  104. // 可以访问p.cfg获取配置
  105. if err := validateTask(task); err != nil {
  106. // 处理无效任务
  107. utils.Logger.WithField("task_id", task.ID).Errorf("任务参数有误,请检查任务:%v", err)
  108. continue
  109. }
  110. // 请求数据
  111. p.hgService.RequestSFlightData(ctx, task)
  112. }
  113. }
  114. }