// services/processor.go package services import ( "context" "go-policy-service/config" "go-policy-service/models" "go-policy-service/utils" "math" "sync" "time" ) var HgConfig *models.HgConfig func Initialize(cfg *config.Config) { // 初始化数据库连接 if err := utils.InitDB(cfg); err != nil { utils.Logger.Fatal("Failed to connect database: ", err) } // 初始化Redis err := utils.InitRedis(cfg) if err != nil { utils.Logger.Fatal(err) } // 获取航管配置(添加上下文和错误处理) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 获取航管账号配置 HgConfig, err = models.GetHGConf(ctx) if err != nil { utils.Logger.Fatal("获取航管配置失败: ", err) } // 获取航管服务 // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig) } type TaskProcessor struct { httpClient utils.HTTPClient cfg *config.Config // 添加配置字段 hgService *HangguanService } func NewTaskProcessor( httpClient utils.HTTPClient, cfg *config.Config, ) *TaskProcessor { return &TaskProcessor{ httpClient: httpClient, cfg: cfg, } } func (p *TaskProcessor) Run(cfg *config.Config) { // 启动Token刷新任务 tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl) // 初始化HangguanService p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr) interval, err := time.ParseDuration(cfg.Interval) if err != nil { utils.Logger.Fatal("无效的时间间隔配置: ", err) } for { utils.Logger.Info("开始任务处理循环...") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) if err := p.ProcessTasks(ctx); err != nil { utils.Logger.Error("任务处理失败: ", err) } cancel() utils.Logger.Infof("任务处理完成,等待 %v 后进行下一轮处理", interval) time.Sleep(interval) } } // ProcessTasks方法改进 func (p *TaskProcessor) ProcessTasks(ctx context.Context) error { tasks, err := models.GetPendingTasks(ctx, 0) // 每次取所有任务 if err != nil { return err } taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道 var wg sync.WaitGroup // 动态调整worker数量 workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks)))) for i := 0; i < workerCount; i++ { wg.Add(1) go func() { defer wg.Done() p.worker(ctx, taskChan) }() } // 分发任务优化 DISTRIBUTE: for _, task := range tasks { select { case taskChan <- task: case <-ctx.Done(): break DISTRIBUTE } } close(taskChan) wg.Wait() return nil } func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) { for task := range tasks { select { case <-ctx.Done(): return default: // 可以访问p.cfg获取配置 if err := validateTask(task); err != nil { // 处理无效任务 utils.Logger.WithField("task_id", task.ID).Errorf("任务参数有误,请检查任务:%v", err) continue } // 请求数据 p.hgService.RequestSFlightData(ctx, task) } } }