| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- // services/processor.go
- package services
- import (
- "context"
- "go-policy-service/models"
- "go-policy-service/utils"
- "sync"
- )
- type TaskProcessor struct {
- httpClient utils.HTTPClient
- concurrency int
- }
- func NewTaskProcessor(httpClient utils.HTTPClient, concurrency int) *TaskProcessor {
- return &TaskProcessor{
- httpClient: httpClient,
- concurrency: concurrency,
- }
- }
- func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
- // 获取待处理任务
- tasks, err := models.GetPendingTasks(ctx, 1000) // 每次最多处理1000个任务
- if err != nil {
- return err
- }
- // 创建任务通道
- taskChan := make(chan models.Task)
- var wg sync.WaitGroup
- // 启动worker
- for i := 0; i < p.concurrency; i++ {
- wg.Add(1)
- go p.worker(ctx, &wg, taskChan)
- }
- // 分发任务
- for _, task := range tasks {
- select {
- case taskChan <- task:
- case <-ctx.Done():
- close(taskChan)
- wg.Wait()
- return ctx.Err()
- }
- }
- close(taskChan)
- wg.Wait()
- return nil
- }
- func (p *TaskProcessor) worker(ctx context.Context, wg *sync.WaitGroup, tasks <-chan models.Task) {
- defer wg.Done()
- for task := range tasks {
- select {
- case <-ctx.Done():
- return
- default:
- p.processTask(ctx, task)
- }
- }
- }
- func (p *TaskProcessor) processTask(ctx context.Context, task models.Task) {
- // 调用第三方接口
- resp, err := p.httpClient.PostJSON(ctx, "https://partner.huoli.com/distribution/api/shopping/flight/list?token=", task.Params)
- if err != nil {
- utils.Logger.WithField("task_id", task.ID).Error("API request failed: ", err)
- //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
- return
- }
- // 处理响应数据
- processedData, err := processResponse(resp)
- if err != nil {
- utils.Logger.WithField("task_id", task.ID).Error("Response processing failed: ", err)
- //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
- return
- }
- // 保存处理后的数据
- if err := models.SaveProcessedData(ctx, &models.ProcessedData{
- TaskID: task.ID,
- Data: processedData,
- }); err != nil {
- utils.Logger.WithField("task_id", task.ID).Error("Failed to save processed data: ", err)
- return
- }
- // 更新任务状态
- // if err := models.UpdateTaskStatus(ctx, task.ID, "completed", task.Attempts+1); err != nil {
- // utils.Logger.WithField("task_id", task.ID).Error("Failed to update task status: ", err)
- // }
- }
- func processResponse(response []byte) (string, error) {
- // 实现具体的响应处理逻辑
-
- // 示例:直接返回原始响应
- return string(response), nil
- }
|