|
|
@@ -3,48 +3,116 @@ package services
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "fmt"
|
|
|
+ "go-policy-service/config"
|
|
|
"go-policy-service/models"
|
|
|
"go-policy-service/utils"
|
|
|
+ "math"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/robfig/cron"
|
|
|
)
|
|
|
|
|
|
+var HgConfig *models.HgConfig
|
|
|
+
|
|
|
+func Initialize(cfg *config.Config) {
|
|
|
+ // 初始化数据库连接
|
|
|
+ if err := utils.InitDB(cfg); err != nil {
|
|
|
+ utils.Logger.Fatal("Failed to connect database: ", err)
|
|
|
+ }
|
|
|
+ // 初始化Redis
|
|
|
+ err := utils.InitRedis(cfg)
|
|
|
+ if err != nil {
|
|
|
+ utils.Logger.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取航管配置(添加上下文和错误处理)
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ // 获取航管账号配置
|
|
|
+ HgConfig, err = models.GetHGConf(ctx)
|
|
|
+ if err != nil {
|
|
|
+ utils.Logger.Fatal("获取航管配置失败: ", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取航管服务
|
|
|
+ // hgService := services.NewHangguanService(cfg, utils.NewHttpClient(30*time.Second), rdb, hgConfig)
|
|
|
+}
|
|
|
+
|
|
|
type TaskProcessor struct {
|
|
|
- httpClient utils.HTTPClient
|
|
|
- concurrency int
|
|
|
+ httpClient utils.HTTPClient
|
|
|
+ cfg *config.Config // 添加配置字段
|
|
|
+ hgService *HangguanService
|
|
|
}
|
|
|
|
|
|
-func NewTaskProcessor(httpClient utils.HTTPClient, concurrency int) *TaskProcessor {
|
|
|
+func NewTaskProcessor(
|
|
|
+ httpClient utils.HTTPClient,
|
|
|
+ cfg *config.Config,
|
|
|
+) *TaskProcessor {
|
|
|
return &TaskProcessor{
|
|
|
- httpClient: httpClient,
|
|
|
- concurrency: concurrency,
|
|
|
+ httpClient: httpClient,
|
|
|
+ cfg: cfg,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (p *TaskProcessor) Run(cfg *config.Config) {
|
|
|
+ // 启动Token刷新任务
|
|
|
+ tokenMgr := NewTokenManager(utils.Rdb, HgConfig, p.httpClient, cfg.HgApiUrl)
|
|
|
+ go func() {
|
|
|
+ tokenMgr.RefreshAccessToken()
|
|
|
+ }()
|
|
|
+ // 初始化HangguanService
|
|
|
+ p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
|
|
|
+
|
|
|
+ // 初始化cron调度器,无自定义logger
|
|
|
+ c := cron.New()
|
|
|
+ scheduleSpec := fmt.Sprintf("@every %s", cfg.Interval)
|
|
|
+ // AddFunc 只返回 EntryID
|
|
|
+ entryID := c.AddFunc(scheduleSpec, func() {
|
|
|
+ utils.Logger.Info("Starting scheduled task processing...")
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 9*time.Minute)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ if err := p.ProcessTasks(ctx); err != nil {
|
|
|
+ utils.Logger.Error("Task processing failed: ", err)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ utils.Logger.Info(fmt.Sprintf("Scheduled task with entry ID: %v", entryID))
|
|
|
+
|
|
|
+ // 开启调度器
|
|
|
+ c.Start()
|
|
|
+ utils.Logger.Info("Service started successfully")
|
|
|
+}
|
|
|
+
|
|
|
+// ProcessTasks方法改进
|
|
|
func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
|
|
|
- // 获取待处理任务
|
|
|
- tasks, err := models.GetPendingTasks(ctx, 1000) // 每次最多处理1000个任务
|
|
|
+ tasks, err := models.GetPendingTasks(ctx, 1000)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // 创建任务通道
|
|
|
- taskChan := make(chan models.Task)
|
|
|
+ taskChan := make(chan models.HgFlightSearchTask, len(tasks)) // 带缓冲通道
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
- // 启动worker
|
|
|
- for i := 0; i < p.concurrency; i++ {
|
|
|
+ // 动态调整worker数量
|
|
|
+ workerCount := int(math.Min(float64(p.cfg.Concurrency), float64(len(tasks))))
|
|
|
+ for i := 0; i < workerCount; i++ {
|
|
|
wg.Add(1)
|
|
|
- go p.worker(ctx, &wg, taskChan)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ p.worker(ctx, taskChan)
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
- // 分发任务
|
|
|
+ // 分发任务优化
|
|
|
+DISTRIBUTE:
|
|
|
for _, task := range tasks {
|
|
|
select {
|
|
|
case taskChan <- task:
|
|
|
case <-ctx.Done():
|
|
|
- close(taskChan)
|
|
|
- wg.Wait()
|
|
|
- return ctx.Err()
|
|
|
+ break DISTRIBUTE
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -53,54 +121,20 @@ func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *TaskProcessor) worker(ctx context.Context, wg *sync.WaitGroup, tasks <-chan models.Task) {
|
|
|
- defer wg.Done()
|
|
|
-
|
|
|
+func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlightSearchTask) {
|
|
|
for task := range tasks {
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
return
|
|
|
default:
|
|
|
- p.processTask(ctx, task)
|
|
|
+ // 可以访问p.cfg获取配置
|
|
|
+ if err := validateTask(task, p.cfg); !valid {
|
|
|
+ // 处理无效任务
|
|
|
+ utils.Logger.WithField("task_id", task.ID).Error("Invalid task parameters")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 请求数据
|
|
|
+ p.hgService.RequestSFlightData(ctx, task)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-func (p *TaskProcessor) processTask(ctx context.Context, task models.Task) {
|
|
|
- // 调用第三方接口
|
|
|
- resp, err := p.httpClient.PostJSON(ctx, "https://partner.huoli.com/distribution/api/shopping/flight/list?token=", task.Params)
|
|
|
- if err != nil {
|
|
|
- utils.Logger.WithField("task_id", task.ID).Error("API request failed: ", err)
|
|
|
- //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // 处理响应数据
|
|
|
- processedData, err := processResponse(resp)
|
|
|
- if err != nil {
|
|
|
- utils.Logger.WithField("task_id", task.ID).Error("Response processing failed: ", err)
|
|
|
- //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // 保存处理后的数据
|
|
|
- if err := models.SaveProcessedData(ctx, &models.ProcessedData{
|
|
|
- TaskID: task.ID,
|
|
|
- Data: processedData,
|
|
|
- }); err != nil {
|
|
|
- utils.Logger.WithField("task_id", task.ID).Error("Failed to save processed data: ", err)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // 更新任务状态
|
|
|
- // if err := models.UpdateTaskStatus(ctx, task.ID, "completed", task.Attempts+1); err != nil {
|
|
|
- // utils.Logger.WithField("task_id", task.ID).Error("Failed to update task status: ", err)
|
|
|
- // }
|
|
|
-}
|
|
|
-
|
|
|
-func processResponse(response []byte) (string, error) {
|
|
|
- // 实现具体的响应处理逻辑
|
|
|
-
|
|
|
- // 示例:直接返回原始响应
|
|
|
- return string(response), nil
|
|
|
-}
|