websocket.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package websocket
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/gorilla/websocket"
  5. "iot_manager_service/util/cache"
  6. "log"
  7. "net/http"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. // websocket客户端链接池
  13. client = make(map[string]*websocket.Conn)
  14. // 用于保护用户连接信息的互斥锁
  15. mutex sync.Mutex
  16. upgrader = websocket.Upgrader{
  17. ReadBufferSize: 1024,
  18. WriteBufferSize: 1024,
  19. HandshakeTimeout: 5 * time.Second,
  20. // 取消ws跨域校验
  21. CheckOrigin: func(r *http.Request) bool {
  22. return true
  23. },
  24. }
  25. )
  26. // 建立连接
  27. func HandleWebSocket(c *gin.Context) {
  28. // 升级 WebSocket 连接
  29. conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
  30. if err != nil {
  31. http.Error(c.Writer, "Could not upgrade to WebSocket", http.StatusInternalServerError)
  32. return
  33. }
  34. // 读取 userID
  35. userID := c.Request.URL.Query().Get("userID")
  36. if userID == "" {
  37. http.Error(c.Writer, "userID not provided", http.StatusBadRequest)
  38. return
  39. }
  40. // 存储用户连接信息
  41. mutex.Lock()
  42. client[userID] = conn
  43. mutex.Unlock()
  44. log.Println("SysUser connected:", userID)
  45. //从redis中获取离线缓存的信息
  46. messages := cache.GetStoreMessages(userID)
  47. if len(messages) != 0 {
  48. for _, message := range messages {
  49. SendMessage(userID, message)
  50. }
  51. //发完后清空redis
  52. cache.Redis.Del("offline:" + userID)
  53. }
  54. }
  55. // 关闭对应的用户连接
  56. func CloseConn(id string) {
  57. mutex.Lock()
  58. conn, ok := client[id]
  59. mutex.Unlock()
  60. if ok {
  61. conn.Close()
  62. }
  63. }
  64. // 发送消息给指定用户
  65. func SendMessage(id, msg string) {
  66. // 查找用户连接
  67. mutex.Lock()
  68. conn, ok := client[id]
  69. mutex.Unlock()
  70. if !ok {
  71. // 用户不在线,存储为离线消息
  72. StoreOfflineMessage(id, msg)
  73. return
  74. }
  75. // 发送消息
  76. err := conn.WriteJSON(msg)
  77. //连接关闭
  78. if err != nil {
  79. // 发送失败,存储为离线消息
  80. StoreOfflineMessage(id, msg)
  81. }
  82. }
  83. func StoreOfflineMessage(id, msg string) error {
  84. err := cache.StoreMessages(id, msg)
  85. if err != nil {
  86. return err
  87. }
  88. return nil
  89. }