// services/processor.go package services import ( "context" "go-policy-service/models" "go-policy-service/utils" "sync" ) type TaskProcessor struct { httpClient utils.HTTPClient concurrency int } func NewTaskProcessor(httpClient utils.HTTPClient, concurrency int) *TaskProcessor { return &TaskProcessor{ httpClient: httpClient, concurrency: concurrency, } } func (p *TaskProcessor) ProcessTasks(ctx context.Context) error { // 获取待处理任务 tasks, err := models.GetPendingTasks(ctx, 1000) // 每次最多处理1000个任务 if err != nil { return err } // 创建任务通道 taskChan := make(chan models.Task) var wg sync.WaitGroup // 启动worker for i := 0; i < p.concurrency; i++ { wg.Add(1) go p.worker(ctx, &wg, taskChan) } // 分发任务 for _, task := range tasks { select { case taskChan <- task: case <-ctx.Done(): close(taskChan) wg.Wait() return ctx.Err() } } close(taskChan) wg.Wait() return nil } func (p *TaskProcessor) worker(ctx context.Context, wg *sync.WaitGroup, tasks <-chan models.Task) { defer wg.Done() for task := range tasks { select { case <-ctx.Done(): return default: p.processTask(ctx, task) } } } func (p *TaskProcessor) processTask(ctx context.Context, task models.Task) { // 调用第三方接口 resp, err := p.httpClient.PostJSON(ctx, "https://partner.huoli.com/distribution/api/shopping/flight/list?token=", task.Params) if err != nil { utils.Logger.WithField("task_id", task.ID).Error("API request failed: ", err) //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1) return } // 处理响应数据 processedData, err := processResponse(resp) if err != nil { utils.Logger.WithField("task_id", task.ID).Error("Response processing failed: ", err) //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1) return } // 保存处理后的数据 if err := models.SaveProcessedData(ctx, &models.ProcessedData{ TaskID: task.ID, Data: processedData, }); err != nil { utils.Logger.WithField("task_id", task.ID).Error("Failed to save processed data: ", err) return } // 更新任务状态 // if err := models.UpdateTaskStatus(ctx, task.ID, "completed", task.Attempts+1); err != nil { // utils.Logger.WithField("task_id", task.ID).Error("Failed to update task status: ", err) // } } func processResponse(response []byte) (string, error) { // 实现具体的响应处理逻辑 // 示例:直接返回原始响应 return string(response), nil }