// models/task.go package models import ( "context" "fmt" "go-policy-service/config" "time" "gorm.io/driver/mysql" "gorm.io/gorm" ) var db *gorm.DB type Task struct { gorm.Model Params string `gorm:"type:text"` Status string `gorm:"type:varchar(20)"` Result string `gorm:"type:text"` Attempts int `gorm:"default:0"` NextTryAt time.Time } type ProcessedData struct { gorm.Model TaskID uint Data string `gorm:"type:text"` } func InitDB(cfg *config.Config) error { dsn := getDSN(cfg) conn, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { return err } db = conn // 自动迁移表结构 if err := db.AutoMigrate(&Task{}, &ProcessedData{}); err != nil { return err } return nil } func getDSN(cfg *config.Config) string { return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", cfg.DBUser, cfg.DBPassword, cfg.DBHost, cfg.DBPort, cfg.DBName) } func GetPendingTasks(ctx context.Context, limit int) ([]Task, error) { var tasks []Task result := db.WithContext(ctx). Where("status = ? AND next_try_at <= ?", "pending", time.Now()). Limit(limit). Find(&tasks) return tasks, result.Error } func SaveProcessedData(ctx context.Context, data *ProcessedData) error { return db.WithContext(ctx).Create(data).Error } func UpdateTaskStatus(ctx context.Context, taskID uint, status string, attempts int) error { return db.WithContext(ctx). Model(&Task{}). Where("id = ?", taskID). Updates(map[string]interface{}{ "status": status, "attempts": attempts, "next_try_at": time.Now().Add(5 * time.Minute), }).Error }