| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package websocket
- import (
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "iot_manager_service/util/cache"
- "log"
- "net/http"
- "sync"
- "time"
- )
- var (
- // websocket客户端链接池
- client = make(map[string]*websocket.Conn)
- // 用于保护用户连接信息的互斥锁
- mutex sync.Mutex
- upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- HandshakeTimeout: 5 * time.Second,
- // 取消ws跨域校验
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- )
- // 建立连接
- func HandleWebSocket(c *gin.Context) {
- // 升级 WebSocket 连接
- conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
- if err != nil {
- http.Error(c.Writer, "Could not upgrade to WebSocket", http.StatusInternalServerError)
- return
- }
- // 读取 userID
- userID := c.Request.URL.Query().Get("userID")
- if userID == "" {
- http.Error(c.Writer, "userID not provided", http.StatusBadRequest)
- return
- }
- // 存储用户连接信息
- mutex.Lock()
- client[userID] = conn
- mutex.Unlock()
- log.Println("User connected:", userID)
- //从redis中获取离线缓存的信息
- messages := cache.GetStoreMessages(userID)
- if len(messages) != 0 {
- for _, message := range messages {
- SendMessage(userID, message)
- }
- //发完后清空redis
- cache.Redis.Del("offline:" + userID)
- }
- }
- // 关闭对应的用户连接
- func CloseConn(id string) {
- mutex.Lock()
- conn, _ := client[id]
- mutex.Unlock()
- conn.Close()
- }
- // 发送消息给指定用户
- func SendMessage(id, msg string) {
- // 查找用户连接
- mutex.Lock()
- conn, ok := client[id]
- mutex.Unlock()
- if !ok {
- // 用户不在线,存储为离线消息
- StoreOfflineMessage(id, msg)
- return
- }
- // 发送消息
- err := conn.WriteJSON(msg)
- //连接关闭
- if err != nil {
- // 发送失败,存储为离线消息
- StoreOfflineMessage(id, msg)
- }
- }
- func StoreOfflineMessage(id, msg string) error {
- err := cache.StoreMessages(id, msg)
- if err != nil {
- return err
- }
- return nil
- }
|