更加通用和可扩展的设计方案,包括架构图、详细说明和核心代码。
- 架构图
┌─────────────┐ ┌─────────────────────────────────────┐
│ Client │◄────┤ Chi Router (HTTP + SSE Endpoints) │
└─────────────┘ └───────────────────┬─────────────────┘
│
┌───────────────────▼─────────────────┐
│ Go Application Logic │
│ ┌────────────────────────────────┐ │
│ │ Event Trigger (e.g. Comment)│ │
│ └────────────────┬───────────────┘ │
│ ┌────────────────▼───────────────┐ │
│ │ Notification Queue Handler │ │
│ └────────────────┬───────────────┘ │
│ ┌────────────────▼───────────────┐ │
│ │ Notification Processor (Timer) │ │
│ └────────────────┬───────────────┘ │
└───────────────────┬─────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
┌─────────▼─────────┐ ┌─────────▼─────────┐ ┌─────────▼─────────┐
│ PostgreSQL DB │ │ Notification │ │ Configuration │
│ (Bun ORM) │ │ Services │ │ Manager │
└───────────────────┘ │ - Email │ └───────────────────┘
│ - SSE │
│ - Push │
└───────────────────┘
- 详细说明
- Client: 用户界面或其他服务,与系统交互。
- Chi Router: 处理 HTTP 请求和 SSE 连接。
- Go Application Logic:
- Event Trigger: 处理触发通知的事件(如评论创建)。
- Notification Queue Handler: 将通知添加到队列中。
- Notification Processor: 定时处理队列中的通知。
- PostgreSQL DB: 使用 Bun ORM 存储应用数据和通知队列。
- Notification Services:
- 实现不同类型的通知发送(邮件、SSE、推送等)。
- 使用接口设计,便于添加新的通知类型。
- Configuration Manager:
- 管理通知渠道的配置。
- 允许动态调整通知行为。
- 核心代码
- 数据模型
// models.go
package main
import (
"time"
"github.com/uptrace/bun"
)
type NotificationQueue struct {
ID int64 `bun:"id,pk,autoincrement"`
UserID int64 `bun:"user_id,notnull"`
Email string `bun:"email,notnull"`
Title string `bun:"title,notnull"`
Message string `bun:"message,notnull"`
Type string `bun:"type,notnull"`
Channel string `bun:"channel,notnull"`
Priority int `bun:"priority,notnull,default:0"`
Metadata JSON `bun:"metadata,type:jsonb"`
Status string `bun:"status,notnull,default:'pending'"`
Attempts int `bun:"attempts,notnull,default:0"`
CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"`
UpdatedAt time.Time `bun:"updated_at,notnull"`
SentAt time.Time `bun:"sent_at,nullzero"`
}
改进后的结构有以下优点:
1. 添加了 `Email` 字段,使得通知系统不仅依赖于 UserID。
2. 增加了 `Title` 字段,可用于电子邮件主题或通知标题。
3. `Type` 字段保留,但可以扩展到更多类型的通知,如推送通知。
4. 新增 `Channel` 字段,用于分类不同类型的通知源。
5. 添加 `Priority` 字段,允许对通知进行优先级排序。
6. 新增 `Metadata` 字段(JSON 类型),可以存储额外的、与特定通知类型相关的信息。
7. 增加 `Attempts` 字段,用于跟踪发送尝试次数,有助于实现重试逻辑。
8. 添加 `UpdatedAt` 字段,用于跟踪记录的最后更新时间。
2. 通知接口和实现
// notifier.go
package main
import "fmt"
type Notifier interface {
Send(notification NotificationQueue) error
}
type EmailNotifier struct{}
func (e *EmailNotifier) Send(notification NotificationQueue) error {
// 实现邮件发送逻辑
fmt.Printf("Sending email to %s: %s\n", notification.Email, notification.Title)
return nil
}
type SSENotifier struct{}
func (s *SSENotifier) Send(notification NotificationQueue) error {
// 实现 SSE 发送逻辑
fmt.Printf("Sending SSE to user %d: %s\n", notification.UserID, notification.Title)
return nil
}
func GetNotifier(notificationType string) (Notifier, error) {
switch notificationType {
case "email":
return &EmailNotifier{}, nil
case "sse":
return &SSENotifier{}, nil
default:
return nil, fmt.Errorf("unsupported notification type: %s", notificationType)
}
}
3. 配置管理
// config.go
package main
import (
"encoding/json"
"io/ioutil"
)
type ChannelConfig struct {
Types []string `json:"types"`
Template string `json:"template"`
Priority int `json:"priority"`
}
var channelConfigs map[string]ChannelConfig
func loadChannelConfigs() error {
data, err := ioutil.ReadFile("channel_configs.json")
if err != nil {
return err
}
return json.Unmarshal(data, &channelConfigs)
}
- 通知处理
// notification_processor.go
package main
import (
"context"
"log"
"time"
"github.com/uptrace/bun"
)
func startNotificationProcessor(db *bun.DB) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
processNotifications(db)
}
}
}
func processNotifications(db *bun.DB) {
var notifications []NotificationQueue
err := db.NewSelect().Model(¬ifications).Where("status = ?", "pending").Limit(100).Scan(context.Background())
if err != nil {
log.Printf("Error fetching notifications: %v", err)
return
}
for _, notification := range notifications {
err := handleNotification(db, ¬ification)
if err != nil {
log.Printf("Error processing notification %d: %v", notification.ID, err)
notification.Attempts++
notification.Status = "failed"
} else {
notification.Status = "sent"
notification.SentAt = time.Now()
}
notification.UpdatedAt = time.Now()
_, err = db.NewUpdate().Model(¬ification).WherePK().Exec(context.Background())
if err != nil {
log.Printf("Error updating notification %d: %v", notification.ID, err)
}
}
}
func handleNotification(db *bun.DB, notification *NotificationQueue) error {
config, exists := channelConfigs[notification.Channel]
if !exists {
return fmt.Errorf("no configuration for channel: %s", notification.Channel)
}
notification.Type = config.Types[0] // 简化处理,总是使用第一个类型
notification.Priority = config.Priority
// 这里可以添加模板渲染逻辑
// notification.Message = renderTemplate(config.Template, notification.Metadata)
notifier, err := GetNotifier(notification.Type)
if err != nil {
return err
}
return notifier.Send(*notification)
}
- 主应用逻辑
// main.go
package main
import (
"log"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"
)
func main() {
// 初始化数据库连接
sqldb := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN("postgres://user:pass@localhost:5432/dbname?sslmode=disable")))
db := bun.NewDB(sqldb, pgdialect.New())
defer db.Close()
// 加载通道配置
err := loadChannelConfigs()
if err != nil {
log.Fatalf("Failed to load channel configs: %v", err)
}
// 启动通知处理器
go startNotificationProcessor(db)
// 设置 Chi 路由
r := chi.NewRouter()
r.Post("/comments", createCommentHandler(db))
r.Get("/sse", sseHandler(db))
// 启动服务器
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
func createCommentHandler(db *bun.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 处理评论创建逻辑
// ...
// 创建通知
notification := NotificationQueue{
UserID: 123, // 示例用户 ID
Email: "[email protected]",
Title: "New Comment",
Message: "Someone commented on your post",
Channel: "comment_notification",
}
_, err := db.NewInsert().Model(¬ification).Exec(r.Context())
if err != nil {
http.Error(w, "Failed to create notification", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
}
func sseHandler(db *bun.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 实现 SSE 处理逻辑
// ...
}
}
设计说明
- 可扩展性:
- 通过
Notifier
接口,可以轻松添加新的通知类型。 ChannelConfig
允许通过配置文件定制不同通知渠道的行为。
- 通过
- 解耦:
- 通知生成(如评论创建)与通知处理逻辑分离。
- 通知队列作为中间层,提高了系统的可靠性和可扩展性。
- 灵活性:
- 通知类型、优先级等可以通过配置动态调整。
- 元数据字段允许存储额外的通知相关信息。
- 可靠性:
- 使用数据库存储通知队列,确保即使在系统崩溃时也不会丢失通知。
- 通过
Attempts
字段实现重试机制。
- 性能:
- 批量处理通知,减少数据库操作。
- 使用定时器异步处理通知,避免阻塞主应用逻辑。
- 可维护性:
- 清晰的代码结构和模块化设计。
- 使用接口和依赖注入,便于单元测试。
这个设计提供了一个强大、灵活的通知系统框架,可以处理各种类型的通知,并且易于扩展和维护。它适用于多种场景,不仅限于评论系统,还可以用于其他需要通知功能的应用。
- 可扩展性: