1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package websocket
- import (
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "iot_manager_service/util/cache"
- "log"
- "net/http"
- "sync"
- "time"
- )
- var (
- // websocket客户端链接池
- client = make(map[string]*websocket.Conn)
- // 用于保护用户连接信息的互斥锁
- mutex sync.Mutex
- upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- HandshakeTimeout: 5 * time.Second,
- // 取消ws跨域校验
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- )
- // 建立连接
- func HandleWebSocket(c *gin.Context) {
- // 升级 WebSocket 连接
- conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
- if err != nil {
- http.Error(c.Writer, "Could not upgrade to WebSocket", http.StatusInternalServerError)
- return
- }
- // 读取 userID
- userID := c.Request.URL.Query().Get("userID")
- if userID == "" {
- http.Error(c.Writer, "userID not provided", http.StatusBadRequest)
- return
- }
- // 存储用户连接信息
- mutex.Lock()
- client[userID] = conn
- mutex.Unlock()
- log.Println("User connected:", userID)
- //从redis中获取离线缓存的信息
- messages := cache.GetStoreMessages(userID)
- if len(messages) != 0 {
- for _, message := range messages {
- SendMessage(userID, message)
- }
- //发完后清空redis
- cache.Redis.Del("offline:" + userID)
- }
- }
- // 关闭对应的用户连接
- func CloseConn(id string) {
- mutex.Lock()
- conn, ok := client[id]
- mutex.Unlock()
- if ok {
- conn.Close()
- }
- }
- // 发送消息给指定用户
- func SendMessage(id, msg string) {
- // 查找用户连接
- mutex.Lock()
- conn, ok := client[id]
- mutex.Unlock()
- if !ok {
- // 用户不在线,存储为离线消息
- StoreOfflineMessage(id, msg)
- return
- }
- // 发送消息
- err := conn.WriteJSON(msg)
- //连接关闭
- if err != nil {
- // 发送失败,存储为离线消息
- StoreOfflineMessage(id, msg)
- }
- }
- func StoreOfflineMessage(id, msg string) error {
- err := cache.StoreMessages(id, msg)
- if err != nil {
- return err
- }
- return nil
- }
|