| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- // services/processor.go
- package services
- import (
- "context"
- "go-policy-service/config"
- "go-policy-service/models"
- "go-policy-service/utils"
- "math"
- "sync"
- "time"
- )
- var HgConfig *models.HgConfig
- func Initialize(cfg *config.Config) {
- // 初始化数据库连接
- if err := utils.InitDB(cfg); err != nil {
- utils.Logger.Fatal("Failed to connect database: ", err)
- }
- // 初始化Redis
- err := utils.InitRedis(cfg)
- if err != nil {
- utils.Logger.Fatal(err)
- }
- // 获取航管配置(添加上下文和错误处理)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- // 获取航管账号配置
- HgConfig, err = models.GetHGConf(ctx)
- if err != nil {
- utils.Logger.Fatal("获取航管配置失败: ", err)
- }
- // 获取航管服务
- // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
- }
- type TaskProcessor struct {
- httpClient utils.HTTPClient
- cfg *config.Config // 添加配置字段
- hgService *HangguanService
- }
- func NewTaskProcessor(
- httpClient utils.HTTPClient,
- cfg *config.Config,
- ) *TaskProcessor {
- return &TaskProcessor{
- httpClient: httpClient,
- cfg: cfg,
- }
- }
- func (p *TaskProcessor) Run(cfg *config.Config) {
- // 启动Token刷新任务
- tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
- // 初始化HangguanService
- p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
- interval, err := time.ParseDuration(cfg.Interval)
- if err != nil {
- utils.Logger.Fatal("无效的时间间隔配置: ", err)
- }
- for {
- utils.Logger.Info("开始任务处理循环...")
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
- if err := p.ProcessTasks(ctx); err != nil {
- utils.Logger.Error("任务处理失败: ", err)
- }
- cancel()
- utils.Logger.Infof("任务处理完成,等待 %v 后进行下一轮处理", interval)
- time.Sleep(interval)
- }
- }
- // ProcessTasks方法改进
- func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
- tasks, err := models.GetPendingTasks(ctx, 0) // 每次取所有任务
- if err != nil {
- return err
- }
- taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
- var wg sync.WaitGroup
- // 动态调整worker数量
- workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
- for i := 0; i < workerCount; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- p.worker(ctx, taskChan)
- }()
- }
- // 分发任务优化
- DISTRIBUTE:
- for _, task := range tasks {
- select {
- case taskChan <- task:
- case <-ctx.Done():
- break DISTRIBUTE
- }
- }
- close(taskChan)
- wg.Wait()
- return nil
- }
- func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
- for task := range tasks {
- select {
- case <-ctx.Done():
- return
- default:
- // 可以访问p.cfg获取配置
- if err := validateTask(task); err != nil {
- // 处理无效任务
- utils.Logger.WithField("task_id", task.ID).Errorf("任务参数有误,请检查任务:%v", err)
- continue
- }
- // 请求数据
- p.hgService.RequestSFlightData(ctx, task)
- }
- }
- }
|