| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- // models/task.go
- package models
- import (
- "context"
- "fmt"
- "go-policy-service/config"
- "time"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- )
- var db *gorm.DB
- type Task struct {
- gorm.Model
- Params string `gorm:"type:text"`
- Status string `gorm:"type:varchar(20)"`
- Result string `gorm:"type:text"`
- Attempts int `gorm:"default:0"`
- NextTryAt time.Time
- }
- type ProcessedData struct {
- gorm.Model
- TaskID uint
- Data string `gorm:"type:text"`
- }
- func InitDB(cfg *config.Config) error {
- dsn := getDSN(cfg)
- conn, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
- if err != nil {
- return err
- }
- db = conn
- // 自动迁移表结构
- if err := db.AutoMigrate(&Task{}, &ProcessedData{}); err != nil {
- return err
- }
- return nil
- }
- func getDSN(cfg *config.Config) string {
- return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4",
- cfg.DBUser,
- cfg.DBPassword,
- cfg.DBHost,
- cfg.DBPort,
- cfg.DBName)
- }
- func GetPendingTasks(ctx context.Context, limit int) ([]Task, error) {
- var tasks []Task
- result := db.WithContext(ctx).
- Where("status = ? AND next_try_at <= ?", "pending", time.Now()).
- Limit(limit).
- Find(&tasks)
- return tasks, result.Error
- }
- func SaveProcessedData(ctx context.Context, data *ProcessedData) error {
- return db.WithContext(ctx).Create(data).Error
- }
- func UpdateTaskStatus(ctx context.Context, taskID uint, status string, attempts int) error {
- return db.WithContext(ctx).
- Model(&Task{}).
- Where("id = ?", taskID).
- Updates(map[string]interface{}{
- "status": status,
- "attempts": attempts,
- "next_try_at": time.Now().Add(5 * time.Minute),
- }).Error
- }
|