light 6 месяцев назад
Родитель
Сommit
c653acfc97
6 измененных файлов с 86 добавлено и 33 удалено
  1. 8 5
      config/config.go
  2. 19 2
      main.go
  3. 6 1
      models/task.go
  4. 2 2
      services/hangguan.go
  5. 14 18
      services/processor.go
  6. 37 5
      utils/init_service.go

+ 8 - 5
config/config.go

@@ -7,11 +7,14 @@ import (
 
 type Config struct {
 	// 数据库配置
-	DBHost     string `env:"DB_HOST" envDefault:"localhost"`
-	DBPort     int    `env:"DB_PORT" envDefault:"3306"`
-	DBUser     string `env:"DB_USER" envDefault:"root"`
-	DBPassword string `env:"DB_PASSWORD" envDefault:"password"`
-	DBName     string `env:"DB_NAME" envDefault:"tasks"`
+	DBHost        string `env:"DB_HOST" envDefault:"localhost"`
+	DBPort        int    `env:"DB_PORT" envDefault:"3306"`
+	DBUser        string `env:"DB_USER" envDefault:"root"`
+	DBPassword    string `env:"DB_PASSWORD" envDefault:"password"`
+	DBName        string `env:"DB_NAME" envDefault:"tasks"`
+	DBMaxIdle     int    `env:"DB_MAX_IDLE" envDefault:"10"`
+	DBMaxOpen     int    `env:"DB_MAX_OPEN" envDefault:"100"`
+	DBMaxLifetime int    `env:"DB_MAX_LIFETIME" envDefault:"30"`
 	//服务器配置
 	Concurrency  int    `env:"CONCURRENCY" envDefault:"20"`
 	Interval     string `env:"INTERVAL" envDefault:"10m"`

+ 19 - 2
main.go

@@ -2,13 +2,20 @@
 package main
 
 import (
+	"context"
 	"go-policy-service/config"
 	"go-policy-service/services"
 	"go-policy-service/utils"
+	"os"
+	"os/signal"
+	"syscall"
 	"time"
 )
 
 func main() {
+	ctx, stop := context.WithCancel(context.Background())
+	defer stop()
+
 	// 初始化配置
 	cfg, err := config.LoadConfig()
 	if err != nil {
@@ -28,6 +35,16 @@ func main() {
 	processor.Run(cfg)
 
 	utils.Logger.Info("Service started successfully")
-	// 保持主进程运行
-	select {}
+
+	// 注册优雅关闭
+	sigCh := make(chan os.Signal, 1)
+	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
+
+	select {
+	case <-sigCh:
+		utils.Logger.Info("Received shutdown signal")
+		stop()
+		utils.CloseResources()
+	case <-ctx.Done():
+	}
 }

+ 6 - 1
models/task.go

@@ -34,7 +34,12 @@ type ProcessedData struct {
 
 func GetPendingTasks(ctx context.Context, limit int) ([]HgFlightSearchTask, error) {
 	var hgShFlNotasks []HgFlightSearchTask
-	result := utils.Db.WithContext(ctx).Model(&HgFlightSearchTask{}).Where("status = ?", 1).Limit(limit).Find(&hgShFlNotasks)
+	result := utils.Db.WithContext(ctx).Model(&HgFlightSearchTask{}).Where("status = ?", 1)
+	if limit > 0 {
+		result = result.Limit(limit).Find(&hgShFlNotasks)
+	} else {
+		result = result.Find(&hgShFlNotasks)
+	}
 	return hgShFlNotasks, result.Error
 }
 

+ 2 - 2
services/hangguan.go

@@ -148,7 +148,7 @@ func (h *HangguanService) processResponse(ctx context.Context, task *models.HgFl
 					PrintPrice:      cabinPrice.AdtPrice.Price,
 					Stock:           cabinPrice.Left,
 					FlightNo:        fltData.FlightInfo.FlyNo,
-					Cabin:           cabinPrice.BaseCabin,
+					Cabin:           cabinPrice.AdtPrice.Cabin,
 					FlightEndDate:   utils.DateFormmat(fltData.FlightInfo.ArrDateTime, "2006-01-02"),
 				}
 			}
@@ -187,7 +187,7 @@ func (h *HangguanService) PushPolicyData(ctx context.Context, data structs.PushD
 	}
 
 	url := h.cfg.PushUrl
-	resp, err := h.httpClient.RequestJSON(ctx, "POST", url, pushData)
+	resp, err := h.httpClient.RequestJSON(ctx, "POST", url+"/api/policy/batch-policy-import", pushData)
 	if err != nil {
 		return fmt.Errorf("推送请求失败: %w", err)
 	}

+ 14 - 18
services/processor.go

@@ -3,15 +3,12 @@ package services
 
 import (
 	"context"
-	"fmt"
 	"go-policy-service/config"
 	"go-policy-service/models"
 	"go-policy-service/utils"
 	"math"
 	"sync"
 	"time"
-
-	"github.com/robfig/cron"
 )
 
 var HgConfig *models.HgConfig
@@ -64,29 +61,28 @@ func (p *TaskProcessor) Run(cfg *config.Config) {
 	// 初始化HangguanService
 	p.hgService = NewHangguanService(cfg, p.httpClient, tokenMgr)
 
-	// 初始化cron调度器,无自定义logger
-	c := cron.New()
-	scheduleSpec := fmt.Sprintf("@every %s", cfg.Interval)
-	// AddFunc 只返回 EntryID
-	entryID := c.AddFunc(scheduleSpec, func() {
-		utils.Logger.Info("Starting scheduled task processing...")
+	interval, err := time.ParseDuration(cfg.Interval)
+	if err != nil {
+		utils.Logger.Fatal("无效的时间间隔配置: ", err)
+	}
+
+	for {
+		utils.Logger.Info("开始任务处理循环...")
 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
-		defer cancel()
 
 		if err := p.ProcessTasks(ctx); err != nil {
-			utils.Logger.Error("Task processing failed: ", err)
+			utils.Logger.Error("任务处理失败: ", err)
 		}
-	})
-	utils.Logger.Info(fmt.Sprintf("Scheduled task with entry ID: %v", entryID))
 
-	// 开启调度器
-	c.Start()
-	utils.Logger.Info("Service started successfully")
+		cancel()
+		utils.Logger.Infof("任务处理完成,等待 %v 后进行下一轮处理", interval)
+		time.Sleep(interval)
+	}
 }
 
 // ProcessTasks方法改进
 func (p *TaskProcessor) ProcessTasks(ctx context.Context) error {
-	tasks, err := models.GetPendingTasks(ctx, 1000)
+	tasks, err := models.GetPendingTasks(ctx, 0) // 每次取所有任务
 	if err != nil {
 		return err
 	}
@@ -128,7 +124,7 @@ func (p *TaskProcessor) worker(ctx context.Context, tasks <-chan models.HgFlight
 			// 可以访问p.cfg获取配置
 			if err := validateTask(task); err != nil {
 				// 处理无效任务
-				utils.Logger.WithField("task_id", task.ID).Errorf("Invalid task parameters:%v", err)
+				utils.Logger.WithField("task_id", task.ID).Errorf("任务参数有误,请检查任务:%v", err)
 				continue
 			}
 			// 请求数据

+ 37 - 5
utils/init_service.go

@@ -21,11 +21,17 @@ var Mongo *mongo.Client
 
 func InitDB(cfg *config.Config) error {
 	dsn := getDSN(cfg)
-	conn, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
+	db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
 	if err != nil {
 		return err
 	}
-	Db = conn
+
+	sqlDB, _ := db.DB()
+	sqlDB.SetMaxIdleConns(cfg.DBMaxIdle)
+	sqlDB.SetMaxOpenConns(cfg.DBMaxOpen)
+	sqlDB.SetConnMaxLifetime(time.Duration(cfg.DBMaxLifetime) * time.Minute)
+
+	Db = db
 	return nil
 }
 
@@ -41,9 +47,10 @@ func getDSN(cfg *config.Config) string {
 // 初始化 Redis
 func InitRedis(cfg *config.Config) error {
 	Rdb = redis.NewClient(&redis.Options{
-		Addr:     fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
-		Password: cfg.RedisPassword,
-		DB:       cfg.RedisDB,
+		Addr:        fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
+		Password:    cfg.RedisPassword,
+		DB:          cfg.RedisDB,
+		DialTimeout: 5 * time.Second,
 	})
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
@@ -86,3 +93,28 @@ func InitMongoDB(cfg *config.Config) {
 
 	fmt.Println("Connected to MongoDB!")
 }
+
+func CloseResources() {
+	if Db != nil {
+		sqlDB, err := Db.DB()
+		if err != nil {
+			Logger.Error("Failed to get SQL DB: ", err)
+			return
+		}
+		if err := sqlDB.Close(); err != nil {
+			Logger.Error("Failed to close MySQL connection: ", err)
+		}
+	}
+
+	if Rdb != nil {
+		if err := Rdb.Close(); err != nil {
+			Logger.Error("Failed to close Redis connection: ", err)
+		}
+	}
+
+	if Mongo != nil {
+		if err := Mongo.Disconnect(context.TODO()); err != nil {
+			Logger.Error("Failed to close MongoDB connection: ", err)
+		}
+	}
+}