目的:

  1. 实现一个评论系统,允许用户对报告进行评论。
  2. 当新评论创建时,生成通知并存储到队列中。
  3. 通过邮件和实时 SSE 向相关用户发送通知。
  4. 确保通知系统的可靠性、实时性和可扩展性。
  5. 处理用户在线/离线状态,确保通知能够及时送达在线用户。

设计说明:

  1. 数据模型:使用 PostgreSQL 数据库,通过 Bun ORM 进行数据操作。
  2. Web 框架:使用 Chi 作为 HTTP 路由器。
  3. 通知机制:实现了一个基于队列的通知系统,支持邮件和 SSE 实时通知。
  4. 异步处理:使用 Go 协程处理通知,避免阻塞主请求。
  5. 实时通信:使用 Server-Sent Events (SSE) 实现实时通知。
  6. 会话管理:实现了基本的用户认证和会话跟踪机制。

架构图:

![[Pasted image 20240719115045.png]]

详细说明:

  1. 数据模型:
    • 使用 Bun ORM 定义 Comment, Report, User, 和 NotificationQueue 模型。
    • NotificationQueue 用于存储待处理的通知。
  2. 评论创建与通知触发:
    • 使用 Chi 中间件在评论创建后触发通知处理。
    • 通知处理器将通知信息插入 NotificationQueue
  3. 通知处理:
    • 定时任务定期检查 NotificationQueue,处理待发送的通知。
    • 对于邮件通知,直接发送邮件。
    • 对于 SSE 通知,检查用户是否在线,如果在线则发送,否则保持待发送状态。
  4. SSE 实现:
    • 为每个在线用户维护一个 SSE 连接。
    • 实现用户认证,确保只有登录用户可以建立 SSE 连接。
    • 使用心跳机制保持连接活跃,并跟踪用户活动状态。
    • 定期清理不活跃的连接。
  5. 会话管理:
    • 实现基本的用户认证机制。
    • 使用内存映射跟踪用户会话和最后活动时间。

详细代码:

  1. 数据模型 (models.go):
package main

import (
    "time"

    "github.com/uptrace/bun"
)

type User struct {
    ID    int64  `bun:"id,pk,autoincrement"`
    Email string `bun:"email,notnull,unique"`
}

type Report struct {
    ID     int64 `bun:"id,pk,autoincrement"`
    UserID int64 `bun:"user_id,notnull"`
}

type Comment struct {
    ID        int64     `bun:"id,pk,autoincrement"`
    UserID    int64     `bun:"user_id,notnull"`
    ReportID  int64     `bun:"report_id,notnull"`
    Content   string    `bun:"content,notnull"`
    CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
}

type NotificationQueue struct {
    ID        int64     `bun:"id,pk,autoincrement"`
    UserID    int64     `bun:"user_id,notnull"`
    ReportID  int64     `bun:"report_id,notnull"`
    Message   string    `bun:"message,notnull"`
    Type      string    `bun:"type,notnull"` // 'email' 或 'sse'
    Status    string    `bun:"status,notnull"` // 'pending', 'sent', 'failed'
    CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
    SentAt    time.Time `bun:"sent_at,nullzero"`
}
  1. 数据库配置 (database.go):
package main

import (
    "database/sql"
    "log"

    "github.com/uptrace/bun"
    "github.com/uptrace/bun/dialect/pgdialect"
    "github.com/uptrace/bun/driver/pgdriver"
)

var db *bun.DB

func initDB() {
    sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN("postgres://user:pass@localhost:5432/dbname?sslmode=disable")))
    db = bun.NewDB(sqldb, pgdialect.New())

    err := db.Ping()
    if err != nil {
        log.Fatal(err)
    }
}
  1. 评论处理 (comment_handler.go):
package main

import (
    "encoding/json"
    "net/http"

    "github.com/go-chi/chi/v5"
)

func createCommentHandler(w http.ResponseWriter, r *http.Request) {
    var comment Comment
    err := json.NewDecoder(r.Body).Decode(&comment)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    _, err = db.NewInsert().Model(&comment).Exec(r.Context())
    if err != nil {
        http.Error(w, "Failed to create comment", http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(comment)
}
  1. 通知中间件 (notification_middleware.go):
package main

import (
    "net/http"

    "github.com/go-chi/chi/v5/middleware"
)

func notificationMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
        next.ServeHTTP(ww, r)

        if ww.Status() == http.StatusCreated {
            var comment Comment
            json.NewDecoder(ww.Body()).Decode(&comment)
            go handleNotification(&comment)
        }
    })
}

func handleNotification(comment *Comment) {
    var report Report
    err := db.NewSelect().Model(&report).Where("id = ?", comment.ReportID).Scan(r.Context())
    if err != nil {
        log.Printf("Error fetching report: %v", err)
        return
    }

    notifications := []NotificationQueue{
        {
            UserID:   report.UserID,
            ReportID: report.ID,
            Message:  "New comment on your report",
            Type:     "email",
            Status:   "pending",
        },
        {
            UserID:   report.UserID,
            ReportID: report.ID,
            Message:  "New comment on your report",
            Type:     "sse",
            Status:   "pending",
        },
    }

    _, err = db.NewInsert().Model(&notifications).Exec(context.Background())
    if err != nil {
        log.Printf("Error inserting notifications: %v", err)
    }
}
  1. SSE 处理 (sse_handler.go):
package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

var (
    clients    = make(map[int64]chan string)
    clientsMux sync.RWMutex
    lastActivity = make(map[int64]time.Time)
    activityMux  sync.RWMutex
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
    userID, err := authenticateUser(r)
    if err != nil {
        http.Error(w, "Unauthorized", http.StatusUnauthorized)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    clientsMux.Lock()
    if _, exists := clients[userID]; exists {
        close(clients[userID])
    }
    clients[userID] = make(chan string)
    clientsMux.Unlock()

    updateLastActivity(userID)

    defer func() {
        clientsMux.Lock()
        if ch, exists := clients[userID]; exists {
            close(ch)
            delete(clients, userID)
        }
        clientsMux.Unlock()

        activityMux.Lock()
        delete(lastActivity, userID)
        activityMux.Unlock()
    }()

    sendPendingSSENotifications(userID, w)

    clientGone := w.(http.CloseNotifier).CloseNotify()
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case msg, ok := <-clients[userID]:
            if !ok {
                return
            }
            fmt.Fprintf(w, "data: %s\n\n", msg)
            w.(http.Flusher).Flush()
            updateLastActivity(userID)
        case <-ticker.C:
            fmt.Fprintf(w, "event: heartbeat\ndata: ping\n\n")
            w.(http.Flusher).Flush()
            updateLastActivity(userID)
        case <-clientGone:
            return
        }
    }
}

func updateLastActivity(userID int64) {
    activityMux.Lock()
    lastActivity[userID] = time.Now()
    activityMux.Unlock()
}

func sendPendingSSENotifications(userID int64, w http.ResponseWriter) {
    var notifications []NotificationQueue
    err := db.NewSelect().Model(&notifications).
        Where("user_id = ? AND type = ? AND status = ?", userID, "sse", "pending").
        Scan(context.Background())
    
    if err != nil {
        log.Printf("Error fetching pending SSE notifications: %v", err)
        return
    }

    for _, notification := range notifications {
        fmt.Fprintf(w, "data: %s\n\n", notification.Message)
        w.(http.Flusher).Flush()

        notification.Status = "sent"
        notification.SentAt = time.Now()
        _, err := db.NewUpdate().Model(&notification).WherePK().Exec(context.Background())
        if err != nil {
            log.Printf("Error updating notification status: %v", err)
        }
    }
}
  1. 通知处理器 (notification_processor.go):
package main

import (
    "context"
    "log"
    "time"
)

func startNotificationProcessor() {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            processNotifications()
        }
    }
}

func processNotifications() {
    var notifications []NotificationQueue
    err := db.NewSelect().Model(&notifications).Where("status = ?", "pending").Limit(100).Scan(context.Background())
    if err != nil {
        log.Printf("Error fetching notifications: %v", err)
        return
    }

    for _, notification := range notifications {
        switch notification.Type {
        case "email":
            sendEmail(notification)
        case "sse":
            sendSSE(notification)
        }
    }
}

func sendEmail(notification NotificationQueue) {
    // 实现发送邮件的逻辑
    // ...

    notification.Status = "sent"
    notification.SentAt = time.Now()
    _, err := db.NewUpdate().Model(&notification).WherePK().Exec(context.Background())
    if err != nil {
        log.Printf("Error updating notification status: %v", err)
    }
}

func sendSSE(notification NotificationQueue) {
    clientsMux.RLock()
    ch, isOnline := clients[notification.UserID]
    clientsMux.RUnlock()

    if isOnline {
        select {
        case ch <- notification.Message:
            notification.Status = "sent"
            notification.SentAt = time.Now()
        default:
            // 通道已满或关闭,保持状态为 pending
        }

        _, err := db.NewUpdate().Model(&notification).WherePK().Exec(context.Background())
        if err != nil {
            log.Printf("Error updating notification status: %v", err)
        }
    }
    // 如果用户不在线,保持状态为 pending,等待下次处理
}
  1. 主函数 (main.go):
package main

import (
    "log"
    "net/http"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    initDB()
    defer db.Close()

    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Use(middleware.Recoverer)

    r.With(notificationMiddleware).Post("/comments", createCommentHandler)
    r.Get("/sse", sseHandler)

    go startNotificationProcessor()
    go cleanInactiveConnections()

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", r))
}

func cleanInactiveConnections() {
    for {
        time.Sleep(5 * time.Minute)
        now := time.Now()
        
        activityMux.RLock()
        clientsMux.Lock()
        for userID, lastActiveTime := range lastActivity {
            if now.Sub(lastActiveTime) > 10*time.Minute {
                if ch, exists := clients[userID]; exists {
                    close(ch)
                    delete(clients, userID)
                }
                delete(lastActivity, userID)
            }
        }
        clientsMux.Unlock()
        activityMux.RUnlock()
    }
}

总结

以上实现了评论创建、通知生成、SSE 处理、邮件发送等功能。 主要特点包括:

  1. 使用 Chi 作为 HTTP 路由器。
  2. 使用 Bun ORM 进行数据库操作。
  3. 实现了中间件来处理评论创建后的通知生成。
  4. 使用 SSE 进行实时通知。
  5. 实现了定时任务来处理待发送的通知。
  6. 包含了用户会话管理和连接清理的逻辑。
架构图
graph TD
    A[客户端] -->|HTTP/SSE| B(Chi 路由器)
    B --> C{Go 应用逻辑}
    C --> D[评论创建钩子]
    C --> E[通知队列处理器]
    C --> F[通知处理器<br>定时任务]
    D --> E
    E --> G[(PostgreSQL<br>Bun ORM)]
    F --> G
    F --> H[邮件服务]
    F --> I[SSE 管理器]
    I --> A
    
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px
    style C fill:#dfd,stroke:#333,stroke-width:2px
    style D fill:#ddf,stroke:#333,stroke-width:2px
    style E fill:#ddf,stroke:#333,stroke-width:2px
    style F fill:#ddf,stroke:#333,stroke-width:2px
    style G fill:#fdd,stroke:#333,stroke-width:2px
    style H fill:#dff,stroke:#333,stroke-width:2px
    style I fill:#dff,stroke:#333,stroke-width:2px
graph TD
    A[Client] <--> B[Chi Router<br>HTTP + SSE Endpoints]
    B --> C[Go Application Logic]
    C --> D[Comment Creation Hook]
    D --> E[Notification Queue Handler]
    E --> F[Notification Processor Timer]
    C --> G[(PostgreSQL DB<br>Bun ORM)]
    C --> H[Email Service]
    C --> I[SSE Manager]

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px
    style C fill:#dfd,stroke:#333,stroke-width:2px
    style D fill:#ffe,stroke:#333,stroke-width:2px
    style E fill:#ffe,stroke:#333,stroke-width:2px
    style F fill:#ffe,stroke:#333,stroke-width:2px
    style G fill:#fdd,stroke:#333,stroke-width:2px
    style H fill:#dff,stroke:#333,stroke-width:2px
    style I fill:#dff,stroke:#333,stroke-width:2px

┌─────────────┐     ┌─────────────────────────────────────┐
│   Client    │◄────┤  Chi Router (HTTP + SSE Endpoints)  │
└─────────────┘     └───────────────────┬─────────────────┘
                                        │
                    ┌───────────────────▼─────────────────┐
                    │       Go Application Logic          │
                    │  ┌────────────────────────────────┐ │
                    │  │     Comment Creation Hook      │ │
                    │  └────────────────┬───────────────┘ │
                    │  ┌────────────────▼───────────────┐ │
                    │  │   Notification Queue Handler   │ │
                    │  └────────────────┬───────────────┘ │
                    │  ┌────────────────▼───────────────┐ │
                    │  │ Notification Processor (Timer) │ │
                    │  └────────────────┬───────────────┘ │
                    └───────────────────┬─────────────────┘
                                        │
              ┌─────────────────────────┼─────────────────────────┐
              │                         │                         │
    ┌─────────▼─────────┐     ┌─────────▼─────────┐     ┌─────────▼─────────┐
    │  PostgreSQL DB    │     │    Email Service  │     │   SSE Manager     │
    │ (Bun ORM)         │     │                   │     │                   │
    └───────────────────┘     └───────────────────┘     └───────────────────┘