task.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // models/task.go
  2. package models
  3. import (
  4. "context"
  5. "fmt"
  6. "go-policy-service/config"
  7. "time"
  8. "gorm.io/driver/mysql"
  9. "gorm.io/gorm"
  10. )
  11. var db *gorm.DB
  12. type Task struct {
  13. gorm.Model
  14. Params string `gorm:"type:text"`
  15. Status string `gorm:"type:varchar(20)"`
  16. Result string `gorm:"type:text"`
  17. Attempts int `gorm:"default:0"`
  18. NextTryAt time.Time
  19. }
  20. type ProcessedData struct {
  21. gorm.Model
  22. TaskID uint
  23. Data string `gorm:"type:text"`
  24. }
  25. func InitDB(cfg *config.Config) error {
  26. dsn := getDSN(cfg)
  27. conn, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  28. if err != nil {
  29. return err
  30. }
  31. db = conn
  32. // 自动迁移表结构
  33. if err := db.AutoMigrate(&Task{}, &ProcessedData{}); err != nil {
  34. return err
  35. }
  36. return nil
  37. }
  38. func getDSN(cfg *config.Config) string {
  39. return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4",
  40. cfg.DBUser,
  41. cfg.DBPassword,
  42. cfg.DBHost,
  43. cfg.DBPort,
  44. cfg.DBName)
  45. }
  46. func GetPendingTasks(ctx context.Context, limit int) ([]Task, error) {
  47. var tasks []Task
  48. result := db.WithContext(ctx).
  49. Where("status = ? AND next_try_at <= ?", "pending", time.Now()).
  50. Limit(limit).
  51. Find(&tasks)
  52. return tasks, result.Error
  53. }
  54. func SaveProcessedData(ctx context.Context, data *ProcessedData) error {
  55. return db.WithContext(ctx).Create(data).Error
  56. }
  57. func UpdateTaskStatus(ctx context.Context, taskID uint, status string, attempts int) error {
  58. return db.WithContext(ctx).
  59. Model(&Task{}).
  60. Where("id = ?", taskID).
  61. Updates(map[string]interface{}{
  62. "status": status,
  63. "attempts": attempts,
  64. "next_try_at": time.Now().Add(5 * time.Minute),
  65. }).Error
  66. }