
  1. 架构图

┌─────────────┐     ┌─────────────────────────────────────┐
│   Client    │◄────┤  Chi Router (HTTP + SSE Endpoints)  │
└─────────────┘     └───────────────────┬─────────────────┘
                    │       Go Application Logic          │
                    │  ┌────────────────────────────────┐ │
                    │  │    Event Trigger (e.g. Comment)│ │
                    │  └────────────────┬───────────────┘ │
                    │  ┌────────────────▼───────────────┐ │
                    │  │   Notification Queue Handler   │ │
                    │  └────────────────┬───────────────┘ │
                    │  ┌────────────────▼───────────────┐ │
                    │  │ Notification Processor (Timer) │ │
                    │  └────────────────┬───────────────┘ │
              │                         │                         │
    ┌─────────▼─────────┐     ┌─────────▼─────────┐     ┌─────────▼─────────┐
    │  PostgreSQL DB    │     │ Notification      │     │ Configuration     │
    │ (Bun ORM)         │     │ Services          │     │ Manager           │
    └───────────────────┘     │ - Email           │     └───────────────────┘
                              │ - SSE             │
                              │ - Push            │
  1. 详细说明
    • Client: 用户界面或其他服务,与系统交互。
    • Chi Router: 处理 HTTP 请求和 SSE 连接。
    • Go Application Logic:
      • Event Trigger: 处理触发通知的事件(如评论创建)。
      • Notification Queue Handler: 将通知添加到队列中。
      • Notification Processor: 定时处理队列中的通知。
    • PostgreSQL DB: 使用 Bun ORM 存储应用数据和通知队列。
    • Notification Services:
      • 实现不同类型的通知发送(邮件、SSE、推送等)。
      • 使用接口设计,便于添加新的通知类型。
    • Configuration Manager:
      • 管理通知渠道的配置。
      • 允许动态调整通知行为。
  2. 核心代码
    1. 数据模型
// models.go
package main

import (

type NotificationQueue struct {
    ID        int64     `bun:"id,pk,autoincrement"`
    UserID    int64     `bun:"user_id,notnull"`
    Email     string    `bun:"email,notnull"`
    Title     string    `bun:"title,notnull"`
    Message   string    `bun:"message,notnull"`
    Type      string    `bun:"type,notnull"`
    Channel   string    `bun:"channel,notnull"`
    Priority  int       `bun:"priority,notnull,default:0"`
    Metadata  JSON      `bun:"metadata,type:jsonb"`
    Status    string    `bun:"status,notnull,default:'pending'"`
    Attempts  int       `bun:"attempts,notnull,default:0"`
    CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
    UpdatedAt time.Time `bun:"updated_at,notnull"`
    SentAt    time.Time `bun:"sent_at,nullzero"`
	1. 添加了 `Email` 字段,使得通知系统不仅依赖于 UserID。
	2. 增加了 `Title` 字段,可用于电子邮件主题或通知标题。
	3. `Type` 字段保留,但可以扩展到更多类型的通知,如推送通知。
	4. 新增 `Channel` 字段,用于分类不同类型的通知源。
	5. 添加 `Priority` 字段,允许对通知进行优先级排序。
	6. 新增 `Metadata` 字段(JSON 类型),可以存储额外的、与特定通知类型相关的信息。
	7. 增加 `Attempts` 字段,用于跟踪发送尝试次数,有助于实现重试逻辑。
	8. 添加 `UpdatedAt` 字段,用于跟踪记录的最后更新时间。

2. 通知接口和实现
// notifier.go
package main

import "fmt"

type Notifier interface {
    Send(notification NotificationQueue) error

type EmailNotifier struct{}

func (e *EmailNotifier) Send(notification NotificationQueue) error {
    // 实现邮件发送逻辑
    fmt.Printf("Sending email to %s: %s\n", notification.Email, notification.Title)
    return nil

type SSENotifier struct{}

func (s *SSENotifier) Send(notification NotificationQueue) error {
    // 实现 SSE 发送逻辑
    fmt.Printf("Sending SSE to user %d: %s\n", notification.UserID, notification.Title)
    return nil

func GetNotifier(notificationType string) (Notifier, error) {
    switch notificationType {
    case "email":
        return &EmailNotifier{}, nil
    case "sse":
        return &SSENotifier{}, nil
        return nil, fmt.Errorf("unsupported notification type: %s", notificationType)
3. 配置管理
// config.go
package main

import (

type ChannelConfig struct {
    Types    []string `json:"types"`
    Template string   `json:"template"`
    Priority int      `json:"priority"`

var channelConfigs map[string]ChannelConfig

func loadChannelConfigs() error {
    data, err := ioutil.ReadFile("channel_configs.json")
    if err != nil {
        return err
    return json.Unmarshal(data, &channelConfigs)
  1. 通知处理
// notification_processor.go
package main

import (


func startNotificationProcessor(db *bun.DB) {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:

func processNotifications(db *bun.DB) {
    var notifications []NotificationQueue
    err := db.NewSelect().Model(&notifications).Where("status = ?", "pending").Limit(100).Scan(context.Background())
    if err != nil {
        log.Printf("Error fetching notifications: %v", err)

    for _, notification := range notifications {
        err := handleNotification(db, &notification)
        if err != nil {
            log.Printf("Error processing notification %d: %v", notification.ID, err)
            notification.Status = "failed"
        } else {
            notification.Status = "sent"
            notification.SentAt = time.Now()
        notification.UpdatedAt = time.Now()
        _, err = db.NewUpdate().Model(&notification).WherePK().Exec(context.Background())
        if err != nil {
            log.Printf("Error updating notification %d: %v", notification.ID, err)

func handleNotification(db *bun.DB, notification *NotificationQueue) error {
    config, exists := channelConfigs[notification.Channel]
    if !exists {
        return fmt.Errorf("no configuration for channel: %s", notification.Channel)

    notification.Type = config.Types[0] // 简化处理,总是使用第一个类型
    notification.Priority = config.Priority

    // 这里可以添加模板渲染逻辑
    // notification.Message = renderTemplate(config.Template, notification.Metadata)

    notifier, err := GetNotifier(notification.Type)
    if err != nil {
        return err
    return notifier.Send(*notification)
  1. 主应用逻辑
// main.go
package main

import (


func main() {
    // 初始化数据库连接
    sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN("postgres://user:pass@localhost:5432/dbname?sslmode=disable")))
    db := bun.NewDB(sqldb, pgdialect.New())
    defer db.Close()

    // 加载通道配置
    err := loadChannelConfigs()
    if err != nil {
        log.Fatalf("Failed to load channel configs: %v", err)

    // 启动通知处理器
    go startNotificationProcessor(db)

    // 设置 Chi 路由
    r := chi.NewRouter()
    r.Post("/comments", createCommentHandler(db))
    r.Get("/sse", sseHandler(db))

    // 启动服务器
    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", r))

func createCommentHandler(db *bun.DB) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 处理评论创建逻辑
        // ...

        // 创建通知
        notification := NotificationQueue{
            UserID:  123, // 示例用户 ID
            Email:   "[email protected]",
            Title:   "New Comment",
            Message: "Someone commented on your post",
            Channel: "comment_notification",
        _, err := db.NewInsert().Model(&notification).Exec(r.Context())
        if err != nil {
            http.Error(w, "Failed to create notification", http.StatusInternalServerError)


func sseHandler(db *bun.DB) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 实现 SSE 处理逻辑
        // ...
  1. 设计说明

    1. 可扩展性
      • 通过 Notifier 接口,可以轻松添加新的通知类型。
      • ChannelConfig 允许通过配置文件定制不同通知渠道的行为。
    2. 解耦
      • 通知生成(如评论创建)与通知处理逻辑分离。
      • 通知队列作为中间层,提高了系统的可靠性和可扩展性。
    3. 灵活性
      • 通知类型、优先级等可以通过配置动态调整。
      • 元数据字段允许存储额外的通知相关信息。
    4. 可靠性
      • 使用数据库存储通知队列,确保即使在系统崩溃时也不会丢失通知。
      • 通过 Attempts 字段实现重试机制。
    5. 性能
      • 批量处理通知,减少数据库操作。
      • 使用定时器异步处理通知,避免阻塞主应用逻辑。
    6. 可维护性
      • 清晰的代码结构和模块化设计。
      • 使用接口和依赖注入,便于单元测试。
