|
@@ -0,0 +1,106 @@
|
|
|
|
|
+// services/processor.go
|
|
|
|
|
+package services
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "context"
|
|
|
|
|
+ "go-policy-service/models"
|
|
|
|
|
+ "go-policy-service/utils"
|
|
|
|
|
+ "sync"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+type TaskProcessor struct {
|
|
|
|
|
+ httpClient utils.HTTPClient
|
|
|
|
|
+ concurrency int
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func NewTaskProcessor(httpClient utils.HTTPClient, concurrency int) *TaskProcessor {
|
|
|
|
|
+ return &TaskProcessor{
|
|
|
|
|
+ httpClient: httpClient,
|
|
|
|
|
+ concurrency: concurrency,
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
|
|
|
|
|
+ // 获取待处理任务
|
|
|
|
|
+ tasks, err := models.GetPendingTasks(ctx, 1000) // 每次最多处理1000个任务
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 创建任务通道
|
|
|
|
|
+ taskChan := make(chan models.Task)
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
|
+
|
|
|
|
|
+ // 启动worker
|
|
|
|
|
+ for i := 0; i < p.concurrency; i++ {
|
|
|
|
|
+ wg.Add(1)
|
|
|
|
|
+ go p.worker(ctx, &wg, taskChan)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 分发任务
|
|
|
|
|
+ for _, task := range tasks {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case taskChan <- task:
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ close(taskChan)
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
+ return ctx.Err()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ close(taskChan)
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *TaskProcessor) worker(ctx context.Context, wg *sync.WaitGroup, tasks <-chan models.Task) {
|
|
|
|
|
+ defer wg.Done()
|
|
|
|
|
+
|
|
|
|
|
+ for task := range tasks {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ default:
|
|
|
|
|
+ p.processTask(ctx, task)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *TaskProcessor) processTask(ctx context.Context, task models.Task) {
|
|
|
|
|
+ // 调用第三方接口
|
|
|
|
|
+ resp, err := p.httpClient.PostJSON(ctx, "https://partner.huoli.com/distribution/api/shopping/flight/list?token=", task.Params)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ utils.Logger.WithField("task_id", task.ID).Error("API request failed: ", err)
|
|
|
|
|
+ //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理响应数据
|
|
|
|
|
+ processedData, err := processResponse(resp)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ utils.Logger.WithField("task_id", task.ID).Error("Response processing failed: ", err)
|
|
|
|
|
+ //models.UpdateTaskStatus(ctx, task.ID, "failed", task.Attempts+1)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 保存处理后的数据
|
|
|
|
|
+ if err := models.SaveProcessedData(ctx, &models.ProcessedData{
|
|
|
|
|
+ TaskID: task.ID,
|
|
|
|
|
+ Data: processedData,
|
|
|
|
|
+ }); err != nil {
|
|
|
|
|
+ utils.Logger.WithField("task_id", task.ID).Error("Failed to save processed data: ", err)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 更新任务状态
|
|
|
|
|
+ // if err := models.UpdateTaskStatus(ctx, task.ID, "completed", task.Attempts+1); err != nil {
|
|
|
|
|
+ // utils.Logger.WithField("task_id", task.ID).Error("Failed to update task status: ", err)
|
|
|
|
|
+ // }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func processResponse(response []byte) (string, error) {
|
|
|
|
|
+ // 实现具体的响应处理逻辑
|
|
|
|
|
+
|
|
|
|
|
+ // 示例:直接返回原始响应
|
|
|
|
|
+ return string(response), nil
|
|
|
|
|
+}
|