processor.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // services/processor.go
  2. package services
  3. import (
  4. "context"
  5. "fmt"
  6. "go-policy-service/config"
  7. "go-policy-service/models"
  8. "go-policy-service/utils"
  9. "math"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. )
  14. var HgConfig *models.HgConfig
  15. func Initialize(cfg *config.Config) {
  16. // 初始化数据库连接
  17. if err := utils.InitDB(cfg); err != nil {
  18. utils.Logger.Fatal("Failed to connect database: ", err)
  19. }
  20. // 初始化Redis
  21. err := utils.InitRedis(cfg)
  22. if err != nil {
  23. utils.Logger.Fatal(err)
  24. }
  25. // 获取航管配置(添加上下文和错误处理)
  26. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  27. defer cancel()
  28. // 获取航管账号配置
  29. HgConfig, err = models.GetHGConf(ctx)
  30. if err != nil {
  31. utils.Logger.Fatal("获取航管配置失败: ", err)
  32. }
  33. // 获取航管服务
  34. // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
  35. }
  36. type TaskProcessor struct {
  37. httpClient utils.HTTPClient
  38. cfg *config.Config // 添加配置字段
  39. hgService *HangguanService
  40. }
  41. func NewTaskProcessor(
  42. httpClient utils.HTTPClient,
  43. cfg *config.Config,
  44. ) *TaskProcessor {
  45. return &TaskProcessor{
  46. httpClient: httpClient,
  47. cfg: cfg,
  48. }
  49. }
  50. func (p *TaskProcessor) Run(cfg *config.Config) {
  51. // 启动Token刷新任务
  52. tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
  53. // 初始化HangguanService
  54. p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
  55. // 初始化cron调度器,无自定义logger
  56. c := cron.New()
  57. scheduleSpec := fmt.Sprintf("@every %s", cfg.Interval)
  58. // AddFunc 只返回 EntryID
  59. entryID := c.AddFunc(scheduleSpec, func() {
  60. utils.Logger.Info("Starting scheduled task processing...")
  61. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
  62. defer cancel()
  63. if err := p.ProcessTasks(ctx); err != nil {
  64. utils.Logger.Error("Task processing failed: ", err)
  65. }
  66. })
  67. utils.Logger.Info(fmt.Sprintf("Scheduled task with entry ID: %v", entryID))
  68. // 开启调度器
  69. c.Start()
  70. utils.Logger.Info("Service started successfully")
  71. }
  72. // ProcessTasks方法改进
  73. func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
  74. tasks, err := models.GetPendingTasks(ctx, 1000)
  75. if err != nil {
  76. return err
  77. }
  78. taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
  79. var wg sync.WaitGroup
  80. // 动态调整worker数量
  81. workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
  82. for i := 0; i < workerCount; i++ {
  83. wg.Add(1)
  84. go func() {
  85. defer wg.Done()
  86. p.worker(ctx, taskChan)
  87. }()
  88. }
  89. // 分发任务优化
  90. DISTRIBUTE:
  91. for _, task := range tasks {
  92. select {
  93. case taskChan <- task:
  94. case <-ctx.Done():
  95. break DISTRIBUTE
  96. }
  97. }
  98. close(taskChan)
  99. wg.Wait()
  100. return nil
  101. }
  102. func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
  103. for task := range tasks {
  104. select {
  105. case <-ctx.Done():
  106. return
  107. default:
  108. // 可以访问p.cfg获取配置
  109. if err := validateTask(task); err != nil {
  110. // 处理无效任务
  111. utils.Logger.WithField("task_id", task.ID).Errorf("Invalid task parameters:%v", err)
  112. continue
  113. }
  114. // 请求数据
  115. p.hgService.RequestSFlightData(ctx, task)
  116. }
  117. }
  118. }