package websocket import ( "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "iot_manager_service/util/cache" "log" "net/http" "sync" "time" ) var ( // websocket客户端链接池 client = make(map[string]*websocket.Conn) // 用于保护用户连接信息的互斥锁 mutex sync.Mutex upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, HandshakeTimeout: 5 * time.Second, // 取消ws跨域校验 CheckOrigin: func(r *http.Request) bool { return true }, } ) // 建立连接 func HandleWebSocket(c *gin.Context) { // 升级 WebSocket 连接 conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { http.Error(c.Writer, "Could not upgrade to WebSocket", http.StatusInternalServerError) return } // 读取 userID userID := c.Request.URL.Query().Get("userID") if userID == "" { http.Error(c.Writer, "userID not provided", http.StatusBadRequest) return } // 存储用户连接信息 mutex.Lock() client[userID] = conn mutex.Unlock() log.Println("SysUser connected:", userID) //从redis中获取离线缓存的信息 messages := cache.GetStoreMessages(userID) if len(messages) != 0 { for _, message := range messages { SendMessage(userID, message) } //发完后清空redis cache.Redis.Del("offline:" + userID) } } // 关闭对应的用户连接 func CloseConn(id string) { mutex.Lock() conn, ok := client[id] mutex.Unlock() if ok { conn.Close() } } // 发送消息给指定用户 func SendMessage(id, msg string) { // 查找用户连接 mutex.Lock() conn, ok := client[id] mutex.Unlock() if !ok { // 用户不在线,存储为离线消息 StoreOfflineMessage(id, msg) return } // 发送消息 err := conn.WriteJSON(msg) //连接关闭 if err != nil { // 发送失败,存储为离线消息 StoreOfflineMessage(id, msg) } } func StoreOfflineMessage(id, msg string) error { err := cache.StoreMessages(id, msg) if err != nil { return err } return nil }