| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- // services/processor.go
- package services
- import (
- "context"
- "fmt"
- "go-policy-service/config"
- "go-policy-service/models"
- "go-policy-service/utils"
- "math"
- "sync"
- "time"
- "github.com/robfig/cron"
- )
- var HgConfig *models.HgConfig
- func Initialize(cfg *config.Config) {
- // 初始化数据库连接
- if err := utils.InitDB(cfg); err != nil {
- utils.Logger.Fatal("Failed to connect database: ", err)
- }
- // 初始化Redis
- err := utils.InitRedis(cfg)
- if err != nil {
- utils.Logger.Fatal(err)
- }
- // 获取航管配置(添加上下文和错误处理)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- // 获取航管账号配置
- HgConfig, err = models.GetHGConf(ctx)
- if err != nil {
- utils.Logger.Fatal("获取航管配置失败: ", err)
- }
- // 获取航管服务
- // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
- }
- type TaskProcessor struct {
- httpClient utils.HTTPClient
- cfg *config.Config // 添加配置字段
- hgService *HangguanService
- }
- func NewTaskProcessor(
- httpClient utils.HTTPClient,
- cfg *config.Config,
- ) *TaskProcessor {
- return &TaskProcessor{
- httpClient: httpClient,
- cfg: cfg,
- }
- }
- func (p *TaskProcessor) Run(cfg *config.Config) {
- // 启动Token刷新任务
- tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
- // 初始化HangguanService
- p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
- // 初始化cron调度器,无自定义logger
- c := cron.New()
- scheduleSpec := fmt.Sprintf("@every %s", cfg.Interval)
- // AddFunc 只返回 EntryID
- entryID := c.AddFunc(scheduleSpec, func() {
- utils.Logger.Info("Starting scheduled task processing...")
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
- defer cancel()
- if err := p.ProcessTasks(ctx); err != nil {
- utils.Logger.Error("Task processing failed: ", err)
- }
- })
- utils.Logger.Info(fmt.Sprintf("Scheduled task with entry ID: %v", entryID))
- // 开启调度器
- c.Start()
- utils.Logger.Info("Service started successfully")
- }
- // ProcessTasks方法改进
- func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
- tasks, err := models.GetPendingTasks(ctx, 1000)
- if err != nil {
- return err
- }
- taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
- var wg sync.WaitGroup
- // 动态调整worker数量
- workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
- for i := 0; i < workerCount; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- p.worker(ctx, taskChan)
- }()
- }
- // 分发任务优化
- DISTRIBUTE:
- for _, task := range tasks {
- select {
- case taskChan <- task:
- case <-ctx.Done():
- break DISTRIBUTE
- }
- }
- close(taskChan)
- wg.Wait()
- return nil
- }
- func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
- for task := range tasks {
- select {
- case <-ctx.Done():
- return
- default:
- // 可以访问p.cfg获取配置
- if err := validateTask(task); err != nil {
- // 处理无效任务
- utils.Logger.WithField("task_id", task.ID).Errorf("Invalid task parameters:%v", err)
- continue
- }
- // 请求数据
- p.hgService.RequestSFlightData(ctx, task)
- }
- }
- }
|