processor.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // services/processor.go
  2. package services
  3. import (
  4. "context"
  5. "go-policy-service/models"
  6. "go-policy-service/utils"
  7. "sync"
  8. )
  9. type TaskProcessor struct {
  10. httpClient utils.HTTPClient
  11. concurrency int
  12. }
  13. func NewTaskProcessor(httpClient utils.HTTPClient, concurrency int) *TaskProcessor {
  14. return &TaskProcessor{
  15. httpClient: httpClient,
  16. concurrency: concurrency,
  17. }
  18. }
  19. func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
  20. // 获取待处理任务
  21. tasks, err := models.GetPendingTasks(ctx, 1000) // 每次最多处理1000个任务
  22. if err != nil {
  23. return err
  24. }
  25. // 创建任务通道
  26. taskChan := make(chan models.Task)
  27. var wg sync.WaitGroup
  28. // 启动worker
  29. for i := 0; i < p.concurrency; i++ {
  30. wg.Add(1)
  31. go p.worker(ctx, &wg, taskChan)
  32. }
  33. // 分发任务
  34. for _, task := range tasks {
  35. select {
  36. case taskChan <- task:
  37. case <-ctx.Done():
  38. close(taskChan)
  39. wg.Wait()
  40. return ctx.Err()
  41. }
  42. }
  43. close(taskChan)
  44. wg.Wait()
  45. return nil
  46. }
  47. func (p *TaskProcessor) worker(ctx context.Context, wg *sync.WaitGroup, tasks <-chan models.Task) {
  48. defer wg.Done()
  49. for task := range tasks {
  50. select {
  51. case <-ctx.Done():
  52. return
  53. default:
  54. p.processTask(ctx, task)
  55. }
  56. }
  57. }
  58. func (p *TaskProcessor) processTask(ctx context.Context, task models.Task) {
  59. // 调用第三方接口
  60. resp, err := p.httpClient.PostJSON(ctx, "https://partner.huoli.com/distribution/api/shopping/flight/list?token=", task.Params)
  61. if err != nil {
  62. utils.Logger.WithField("task_id", task.ID).Error("API request failed: ", err)
  63. //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
  64. return
  65. }
  66. // 处理响应数据
  67. processedData, err := processResponse(resp)
  68. if err != nil {
  69. utils.Logger.WithField("task_id", task.ID).Error("Response processing failed: ", err)
  70. //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
  71. return
  72. }
  73. // 保存处理后的数据
  74. if err := models.SaveProcessedData(ctx, &models.ProcessedData{
  75. TaskID: task.ID,
  76. Data: processedData,
  77. }); err != nil {
  78. utils.Logger.WithField("task_id", task.ID).Error("Failed to save processed data: ", err)
  79. return
  80. }
  81. // 更新任务状态
  82. // if err := models.UpdateTaskStatus(ctx, task.ID, "completed", task.Attempts+1); err != nil {
  83. // utils.Logger.WithField("task_id", task.ID).Error("Failed to update task status: ", err)
  84. // }
  85. }
  86. func processResponse(response []byte) (string, error) {
  87. // 实现具体的响应处理逻辑
  88. // 示例:直接返回原始响应
  89. return string(response), nil
  90. }