websocket.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package websocket
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/gorilla/websocket"
  5. "iot_manager_service/util/cache"
  6. "log"
  7. "net/http"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. // websocket客户端链接池
  13. client = make(map[string]*websocket.Conn)
  14. // 用于保护用户连接信息的互斥锁
  15. mutex sync.Mutex
  16. upgrader = websocket.Upgrader{
  17. ReadBufferSize: 1024,
  18. WriteBufferSize: 1024,
  19. HandshakeTimeout: 5 * time.Second,
  20. // 取消ws跨域校验
  21. CheckOrigin: func(r *http.Request) bool {
  22. return true
  23. },
  24. }
  25. )
  26. // 建立连接
  27. func HandleWebSocket(c *gin.Context) {
  28. // 升级 WebSocket 连接
  29. conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
  30. if err != nil {
  31. http.Error(c.Writer, "Could not upgrade to WebSocket", http.StatusInternalServerError)
  32. return
  33. }
  34. // 读取 userID
  35. userID := c.Request.URL.Query().Get("userID")
  36. if userID == "" {
  37. http.Error(c.Writer, "userID not provided", http.StatusBadRequest)
  38. return
  39. }
  40. // 存储用户连接信息
  41. mutex.Lock()
  42. client[userID] = conn
  43. mutex.Unlock()
  44. log.Println("User connected:", userID)
  45. //从redis中获取离线缓存的信息
  46. messages := cache.GetStoreMessages(userID)
  47. if len(messages) != 0 {
  48. for _, message := range messages {
  49. SendMessage(userID, message)
  50. }
  51. //发完后清空redis
  52. cache.Redis.Del("offline:" + userID)
  53. }
  54. }
  55. // 关闭对应的用户连接
  56. func CloseConn(id string) {
  57. mutex.Lock()
  58. conn, _ := client[id]
  59. mutex.Unlock()
  60. conn.Close()
  61. }
  62. // 发送消息给指定用户
  63. func SendMessage(id, msg string) {
  64. // 查找用户连接
  65. mutex.Lock()
  66. conn, ok := client[id]
  67. mutex.Unlock()
  68. if !ok {
  69. // 用户不在线,存储为离线消息
  70. StoreOfflineMessage(id, msg)
  71. return
  72. }
  73. // 发送消息
  74. err := conn.WriteJSON(msg)
  75. //连接关闭
  76. if err != nil {
  77. // 发送失败,存储为离线消息
  78. StoreOfflineMessage(id, msg)
  79. }
  80. }
  81. func StoreOfflineMessage(id, msg string) error {
  82. err := cache.StoreMessages(id, msg)
  83. if err != nil {
  84. return err
  85. }
  86. return nil
  87. }