使用 Go 实现一个基于 Gin 框架和 Redis 的分布式 WebSocket 系统需要以下几个步骤:
实现架构
- Gin 处理 HTTP/WebSocket 请求
- Gin 用于启动 HTTP 服务并处理 WebSocket 请求。
- Redis Pub/Sub
- Redis 用于跨节点消息分发。
- WebSocket 连接管理
- 在服务内维护 WebSocket 连接池。
代码实现
以下是一个简单的示例代码:
1. 安装依赖
使用 go mod
初始化项目并安装依赖:
go mod init websocket-example go get -u github.com/gin-gonic/gin go get -u github.com/go-redis/redis/v8
2. 服务端代码
package main import ( "context" "fmt" "log" "net/http" "sync" "github.com/gin-gonic/gin" "github.com/go-redis/redis/v8" "github.com/gorilla/websocket" ) // Redis context and client var ctx = context.Background() var redisClient = redis.NewClient(&redis.Options{ Addr: "localhost:6379", // Redis 地址 }) // WebSocket upgrader var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // 允许所有来源连接 }, } // Connection pool to manage active WebSocket connections var connections = make(map[string]*websocket.Conn) var connLock sync.Mutex func main() { r := gin.Default() // WebSocket endpoint r.GET("/ws/:channel", func(c *gin.Context) { channel := c.Param("channel") // 升级为 WebSocket conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Println("Upgrade error:", err) return } defer conn.Close() // 将连接加入连接池 connLock.Lock() connections[channel] = conn connLock.Unlock() // 启动 Redis 订阅 go subscribeToRedis(channel, conn) // 监听客户端发送的消息 for { _, message, err := conn.ReadMessage() if err != nil { log.Println("Read error:", err) break } // 发布消息到 Redis err = redisClient.Publish(ctx, channel, string(message)).Err() if err != nil { log.Println("Redis publish error:", err) } } // 连接断开时,从连接池中移除 connLock.Lock() delete(connections, channel) connLock.Unlock() }) // 启动服务器 r.Run(":8080") } // Subscribe to Redis channel func subscribeToRedis(channel string, conn *websocket.Conn) { sub := redisClient.Subscribe(ctx, channel) defer sub.Close() ch := sub.Channel() for msg := range ch { connLock.Lock() err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload)) connLock.Unlock() if err != nil { log.Println("Write error:", err) break } } }
代码说明
-
Gin 路由:
GET /ws/:channel
用于处理 WebSocket 请求,channel
参数用于标识消息主题(类似于聊天室或订阅主题)。 -
Redis Pub/Sub:
- Publish:当 WebSocket 客户端发送消息时,服务端将消息发布到 Redis 的指定频道。
- Subscribe:服务端订阅 Redis 频道,当有新消息时将其发送到 WebSocket 客户端。
-
WebSocket 连接管理: 使用
map[string]*websocket.Conn
存储活动连接,方便在断开时清理。 -
并发安全:
使用sync.Mutex
确保对共享连接池的并发访问是安全的。
测试
-
-
启动 Redis 服务器:
-
redis-server
-
启动 WebSocket 服务:
-
使用 WebSocket 客户端(如
wscat
或网页工具)测试:- 连接服务:
- 发送消息:
在一个客户端发送消息,另一个客户端会收到通过 Redis 分发的消息。
优化方向
-
- 连接池优化:可以使用库如
gorilla/websocket
提供的连接管理工具。 - 日志记录:增加更详细的日志以调试问题。
- Redis 配置:生产环境中使用 Redis 集群和持久化机制。
- 扩展:通过负载均衡器(如 Nginx 或 Traefik)支持多实例部署。
-
- 连接池优化:可以使用库如
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容