processor.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // services/processor.go
  2. package services
  3. import (
  4. "context"
  5. "fmt"
  6. "go-policy-service/config"
  7. "go-policy-service/models"
  8. "go-policy-service/utils"
  9. "math"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. )
  14. var HgConfig *models.HgConfig
  15. func Initialize(cfg *config.Config) {
  16. // 初始化数据库连接
  17. if err := utils.InitDB(cfg); err != nil {
  18. utils.Logger.Fatal("Failed to connect database: ", err)
  19. }
  20. // 初始化Redis
  21. err := utils.InitRedis(cfg)
  22. if err != nil {
  23. utils.Logger.Fatal(err)
  24. }
  25. // 获取航管配置(添加上下文和错误处理)
  26. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  27. defer cancel()
  28. // 获取航管账号配置
  29. HgConfig, err = models.GetHGConf(ctx)
  30. if err != nil {
  31. utils.Logger.Fatal("获取航管配置失败: ", err)
  32. }
  33. // 获取航管服务
  34. // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
  35. }
  36. type TaskProcessor struct {
  37. httpClient utils.HTTPClient
  38. cfg *config.Config // 添加配置字段
  39. hgService *HangguanService
  40. }
  41. func NewTaskProcessor(
  42. httpClient utils.HTTPClient,
  43. cfg *config.Config,
  44. ) *TaskProcessor {
  45. return &TaskProcessor{
  46. httpClient: httpClient,
  47. cfg: cfg,
  48. }
  49. }
  50. func (p *TaskProcessor) Run(cfg *config.Config) {
  51. // 启动Token刷新任务
  52. tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
  53. go func() {
  54. tokenMgr.RefreshAccessToken()
  55. }()
  56. // 初始化HangguanService
  57. p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
  58. // 初始化cron调度器,无自定义logger
  59. c := cron.New()
  60. scheduleSpec := fmt.Sprintf("@every %s", cfg.Interval)
  61. // AddFunc 只返回 EntryID
  62. entryID := c.AddFunc(scheduleSpec, func() {
  63. utils.Logger.Info("Starting scheduled task processing...")
  64. ctx, cancel := context.WithTimeout(context.Background(), 9*time.Minute)
  65. defer cancel()
  66. if err := p.ProcessTasks(ctx); err != nil {
  67. utils.Logger.Error("Task processing failed: ", err)
  68. }
  69. })
  70. utils.Logger.Info(fmt.Sprintf("Scheduled task with entry ID: %v", entryID))
  71. // 开启调度器
  72. c.Start()
  73. utils.Logger.Info("Service started successfully")
  74. }
  75. // ProcessTasks方法改进
  76. func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
  77. tasks, err := models.GetPendingTasks(ctx, 1000)
  78. if err != nil {
  79. return err
  80. }
  81. taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
  82. var wg sync.WaitGroup
  83. // 动态调整worker数量
  84. workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
  85. for i := 0; i < workerCount; i++ {
  86. wg.Add(1)
  87. go func() {
  88. defer wg.Done()
  89. p.worker(ctx, taskChan)
  90. }()
  91. }
  92. // 分发任务优化
  93. DISTRIBUTE:
  94. for _, task := range tasks {
  95. select {
  96. case taskChan <- task:
  97. case <-ctx.Done():
  98. break DISTRIBUTE
  99. }
  100. }
  101. close(taskChan)
  102. wg.Wait()
  103. return nil
  104. }
  105. func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
  106. for task := range tasks {
  107. select {
  108. case <-ctx.Done():
  109. return
  110. default:
  111. // 可以访问p.cfg获取配置
  112. if err := validateTask(task, p.cfg); !valid {
  113. // 处理无效任务
  114. utils.Logger.WithField("task_id", task.ID).Error("Invalid task parameters")
  115. continue
  116. }
  117. // 请求数据
  118. p.hgService.RequestSFlightData(ctx, task)
  119. }
  120. }
  121. }