目的:
- 实现一个评论系统,允许用户对报告进行评论。
- 当新评论创建时,生成通知并存储到队列中。
- 通过邮件和实时 SSE 向相关用户发送通知。
- 确保通知系统的可靠性、实时性和可扩展性。
- 处理用户在线/离线状态,确保通知能够及时送达在线用户。
设计说明:
- 数据模型:使用 PostgreSQL 数据库,通过 Bun ORM 进行数据操作。
- Web 框架:使用 Chi 作为 HTTP 路由器。
- 通知机制:实现了一个基于队列的通知系统,支持邮件和 SSE 实时通知。
- 异步处理:使用 Go 协程处理通知,避免阻塞主请求。
- 实时通信:使用 Server-Sent Events (SSE) 实现实时通知。
- 会话管理:实现了基本的用户认证和会话跟踪机制。
架构图:
![[Pasted image 20240719115045.png]]
详细说明:
- 数据模型:
- 使用 Bun ORM 定义
Comment
,Report
,User
, 和NotificationQueue
模型。 NotificationQueue
用于存储待处理的通知。
- 使用 Bun ORM 定义
- 评论创建与通知触发:
- 使用 Chi 中间件在评论创建后触发通知处理。
- 通知处理器将通知信息插入
NotificationQueue
。
- 通知处理:
- 定时任务定期检查
NotificationQueue
,处理待发送的通知。 - 对于邮件通知,直接发送邮件。
- 对于 SSE 通知,检查用户是否在线,如果在线则发送,否则保持待发送状态。
- 定时任务定期检查
- SSE 实现:
- 为每个在线用户维护一个 SSE 连接。
- 实现用户认证,确保只有登录用户可以建立 SSE 连接。
- 使用心跳机制保持连接活跃,并跟踪用户活动状态。
- 定期清理不活跃的连接。
- 会话管理:
- 实现基本的用户认证机制。
- 使用内存映射跟踪用户会话和最后活动时间。
详细代码:
- 数据模型 (models.go):
package main
import (
"time"
"github.com/uptrace/bun"
)
type User struct {
ID int64 `bun:"id,pk,autoincrement"`
Email string `bun:"email,notnull,unique"`
}
type Report struct {
ID int64 `bun:"id,pk,autoincrement"`
UserID int64 `bun:"user_id,notnull"`
}
type Comment struct {
ID int64 `bun:"id,pk,autoincrement"`
UserID int64 `bun:"user_id,notnull"`
ReportID int64 `bun:"report_id,notnull"`
Content string `bun:"content,notnull"`
CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
}
type NotificationQueue struct {
ID int64 `bun:"id,pk,autoincrement"`
UserID int64 `bun:"user_id,notnull"`
ReportID int64 `bun:"report_id,notnull"`
Message string `bun:"message,notnull"`
Type string `bun:"type,notnull"` // 'email' 或 'sse'
Status string `bun:"status,notnull"` // 'pending', 'sent', 'failed'
CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
SentAt time.Time `bun:"sent_at,nullzero"`
}
- 数据库配置 (database.go):
package main
import (
"database/sql"
"log"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"
)
var db *bun.DB
func initDB() {
sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN("postgres://user:pass@localhost:5432/dbname?sslmode=disable")))
db = bun.NewDB(sqldb, pgdialect.New())
err := db.Ping()
if err != nil {
log.Fatal(err)
}
}
- 评论处理 (comment_handler.go):
package main
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
)
func createCommentHandler(w http.ResponseWriter, r *http.Request) {
var comment Comment
err := json.NewDecoder(r.Body).Decode(&comment)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = db.NewInsert().Model(&comment).Exec(r.Context())
if err != nil {
http.Error(w, "Failed to create comment", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(comment)
}
- 通知中间件 (notification_middleware.go):
package main
import (
"net/http"
"github.com/go-chi/chi/v5/middleware"
)
func notificationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
next.ServeHTTP(ww, r)
if ww.Status() == http.StatusCreated {
var comment Comment
json.NewDecoder(ww.Body()).Decode(&comment)
go handleNotification(&comment)
}
})
}
func handleNotification(comment *Comment) {
var report Report
err := db.NewSelect().Model(&report).Where("id = ?", comment.ReportID).Scan(r.Context())
if err != nil {
log.Printf("Error fetching report: %v", err)
return
}
notifications := []NotificationQueue{
{
UserID: report.UserID,
ReportID: report.ID,
Message: "New comment on your report",
Type: "email",
Status: "pending",
},
{
UserID: report.UserID,
ReportID: report.ID,
Message: "New comment on your report",
Type: "sse",
Status: "pending",
},
}
_, err = db.NewInsert().Model(¬ifications).Exec(context.Background())
if err != nil {
log.Printf("Error inserting notifications: %v", err)
}
}
- SSE 处理 (sse_handler.go):
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
var (
clients = make(map[int64]chan string)
clientsMux sync.RWMutex
lastActivity = make(map[int64]time.Time)
activityMux sync.RWMutex
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
userID, err := authenticateUser(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
clientsMux.Lock()
if _, exists := clients[userID]; exists {
close(clients[userID])
}
clients[userID] = make(chan string)
clientsMux.Unlock()
updateLastActivity(userID)
defer func() {
clientsMux.Lock()
if ch, exists := clients[userID]; exists {
close(ch)
delete(clients, userID)
}
clientsMux.Unlock()
activityMux.Lock()
delete(lastActivity, userID)
activityMux.Unlock()
}()
sendPendingSSENotifications(userID, w)
clientGone := w.(http.CloseNotifier).CloseNotify()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case msg, ok := <-clients[userID]:
if !ok {
return
}
fmt.Fprintf(w, "data: %s\n\n", msg)
w.(http.Flusher).Flush()
updateLastActivity(userID)
case <-ticker.C:
fmt.Fprintf(w, "event: heartbeat\ndata: ping\n\n")
w.(http.Flusher).Flush()
updateLastActivity(userID)
case <-clientGone:
return
}
}
}
func updateLastActivity(userID int64) {
activityMux.Lock()
lastActivity[userID] = time.Now()
activityMux.Unlock()
}
func sendPendingSSENotifications(userID int64, w http.ResponseWriter) {
var notifications []NotificationQueue
err := db.NewSelect().Model(¬ifications).
Where("user_id = ? AND type = ? AND status = ?", userID, "sse", "pending").
Scan(context.Background())
if err != nil {
log.Printf("Error fetching pending SSE notifications: %v", err)
return
}
for _, notification := range notifications {
fmt.Fprintf(w, "data: %s\n\n", notification.Message)
w.(http.Flusher).Flush()
notification.Status = "sent"
notification.SentAt = time.Now()
_, err := db.NewUpdate().Model(¬ification).WherePK().Exec(context.Background())
if err != nil {
log.Printf("Error updating notification status: %v", err)
}
}
}
- 通知处理器 (notification_processor.go):
package main
import (
"context"
"log"
"time"
)
func startNotificationProcessor() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
processNotifications()
}
}
}
func processNotifications() {
var notifications []NotificationQueue
err := db.NewSelect().Model(¬ifications).Where("status = ?", "pending").Limit(100).Scan(context.Background())
if err != nil {
log.Printf("Error fetching notifications: %v", err)
return
}
for _, notification := range notifications {
switch notification.Type {
case "email":
sendEmail(notification)
case "sse":
sendSSE(notification)
}
}
}
func sendEmail(notification NotificationQueue) {
// 实现发送邮件的逻辑
// ...
notification.Status = "sent"
notification.SentAt = time.Now()
_, err := db.NewUpdate().Model(¬ification).WherePK().Exec(context.Background())
if err != nil {
log.Printf("Error updating notification status: %v", err)
}
}
func sendSSE(notification NotificationQueue) {
clientsMux.RLock()
ch, isOnline := clients[notification.UserID]
clientsMux.RUnlock()
if isOnline {
select {
case ch <- notification.Message:
notification.Status = "sent"
notification.SentAt = time.Now()
default:
// 通道已满或关闭,保持状态为 pending
}
_, err := db.NewUpdate().Model(¬ification).WherePK().Exec(context.Background())
if err != nil {
log.Printf("Error updating notification status: %v", err)
}
}
// 如果用户不在线,保持状态为 pending,等待下次处理
}
- 主函数 (main.go):
package main
import (
"log"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
func main() {
initDB()
defer db.Close()
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.With(notificationMiddleware).Post("/comments", createCommentHandler)
r.Get("/sse", sseHandler)
go startNotificationProcessor()
go cleanInactiveConnections()
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
func cleanInactiveConnections() {
for {
time.Sleep(5 * time.Minute)
now := time.Now()
activityMux.RLock()
clientsMux.Lock()
for userID, lastActiveTime := range lastActivity {
if now.Sub(lastActiveTime) > 10*time.Minute {
if ch, exists := clients[userID]; exists {
close(ch)
delete(clients, userID)
}
delete(lastActivity, userID)
}
}
clientsMux.Unlock()
activityMux.RUnlock()
}
}
总结
以上实现了评论创建、通知生成、SSE 处理、邮件发送等功能。 主要特点包括:
- 使用 Chi 作为 HTTP 路由器。
- 使用 Bun ORM 进行数据库操作。
- 实现了中间件来处理评论创建后的通知生成。
- 使用 SSE 进行实时通知。
- 实现了定时任务来处理待发送的通知。
- 包含了用户会话管理和连接清理的逻辑。
架构图
graph TD
A[客户端] -->|HTTP/SSE| B(Chi 路由器)
B --> C{Go 应用逻辑}
C --> D[评论创建钩子]
C --> E[通知队列处理器]
C --> F[通知处理器<br>定时任务]
D --> E
E --> G[(PostgreSQL<br>Bun ORM)]
F --> G
F --> H[邮件服务]
F --> I[SSE 管理器]
I --> A
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#dfd,stroke:#333,stroke-width:2px
style D fill:#ddf,stroke:#333,stroke-width:2px
style E fill:#ddf,stroke:#333,stroke-width:2px
style F fill:#ddf,stroke:#333,stroke-width:2px
style G fill:#fdd,stroke:#333,stroke-width:2px
style H fill:#dff,stroke:#333,stroke-width:2px
style I fill:#dff,stroke:#333,stroke-width:2px
graph TD
A[Client] <--> B[Chi Router<br>HTTP + SSE Endpoints]
B --> C[Go Application Logic]
C --> D[Comment Creation Hook]
D --> E[Notification Queue Handler]
E --> F[Notification Processor Timer]
C --> G[(PostgreSQL DB<br>Bun ORM)]
C --> H[Email Service]
C --> I[SSE Manager]
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#dfd,stroke:#333,stroke-width:2px
style D fill:#ffe,stroke:#333,stroke-width:2px
style E fill:#ffe,stroke:#333,stroke-width:2px
style F fill:#ffe,stroke:#333,stroke-width:2px
style G fill:#fdd,stroke:#333,stroke-width:2px
style H fill:#dff,stroke:#333,stroke-width:2px
style I fill:#dff,stroke:#333,stroke-width:2px
┌─────────────┐ ┌─────────────────────────────────────┐
│ Client │◄────┤ Chi Router (HTTP + SSE Endpoints) │
└─────────────┘ └───────────────────┬─────────────────┘
│
┌───────────────────▼─────────────────┐
│ Go Application Logic │
│ ┌────────────────────────────────┐ │
│ │ Comment Creation Hook │ │
│ └────────────────┬───────────────┘ │
│ ┌────────────────▼───────────────┐ │
│ │ Notification Queue Handler │ │
│ └────────────────┬───────────────┘ │
│ ┌────────────────▼───────────────┐ │
│ │ Notification Processor (Timer) │ │
│ └────────────────┬───────────────┘ │
└───────────────────┬─────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
┌─────────▼─────────┐ ┌─────────▼─────────┐ ┌─────────▼─────────┐
│ PostgreSQL DB │ │ Email Service │ │ SSE Manager │
│ (Bun ORM) │ │ │ │ │
└───────────────────┘ └───────────────────┘ └───────────────────┘