mqtt.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package admin
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "regexp"
  7. "runtime"
  8. "server/dao"
  9. "server/model/common"
  10. "server/utils/logger"
  11. "server/utils/mqtt"
  12. "server/utils/protocol"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func InitMqtt() {
  18. MqttService = GetHandler()
  19. MqttService.SubscribeTopics()
  20. go MqttService.Handler()
  21. }
  22. var MqttService *MqttHandler
  23. var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout")
  24. var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`)
  25. var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected")
  26. type MqttHandler struct {
  27. queue *mqtt.MlQueue
  28. }
  29. var _handlerOnce sync.Once
  30. var _handlerSingle *MqttHandler
  31. func GetHandler() *MqttHandler {
  32. _handlerOnce.Do(func() {
  33. _handlerSingle = &MqttHandler{
  34. queue: mqtt.NewQueue(10000),
  35. }
  36. })
  37. return _handlerSingle
  38. }
  39. func (o *MqttHandler) SubscribeTopics() {
  40. mqtt.GetMQTTMgr().Subscribe("smart_tunnel/#", mqtt.AtLeastOnce, o.HandlerData)
  41. }
  42. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  43. for {
  44. ok, cnt := o.queue.Put(&m)
  45. if ok {
  46. break
  47. } else {
  48. logger.Get().Error("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  49. runtime.Gosched()
  50. }
  51. }
  52. }
  53. func (o *MqttHandler) Handler() interface{} {
  54. defer func() {
  55. if err := recover(); err != nil {
  56. go GetHandler().Handler()
  57. //logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  58. }
  59. }()
  60. for {
  61. msg, ok, quantity := o.queue.Get()
  62. if !ok {
  63. time.Sleep(10 * time.Millisecond)
  64. continue
  65. } else if quantity > 1000 {
  66. logger.Get().Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  67. }
  68. m, ok := msg.(*mqtt.Message)
  69. if !ok {
  70. continue
  71. }
  72. sn, operate, err := ParseTopic(m.Topic())
  73. if err != nil {
  74. //logger.Logger.Errorf("parseTopic err")
  75. continue
  76. }
  77. switch operate {
  78. case protocol.TopicGatherDataEnv:
  79. var data dao.EnvData
  80. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  81. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  82. continue
  83. }
  84. data.TunnelSn = sn
  85. if data.Sn == "" {
  86. continue
  87. }
  88. err = data.CreateEnvData()
  89. if err != nil {
  90. logger.Get().Errorf("sn:%s device:%s 新增环境信息失败: %v", sn, data.Sn, err)
  91. continue
  92. }
  93. case protocol.TopicGatherDataOpt:
  94. var data dao.OpticalData
  95. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  96. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  97. continue
  98. }
  99. data.TunnelSn = sn
  100. if data.Sn == "" {
  101. continue
  102. }
  103. err = data.CreateOpticalData()
  104. if err != nil {
  105. logger.Get().Errorf("sn:%s device:%s 新增光感信息失败: %v", sn, data.Sn, err)
  106. continue
  107. }
  108. case protocol.TopicGatherDataDG:
  109. var data common.InductanceData
  110. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  111. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  112. continue
  113. }
  114. dao.InductanceDetails{
  115. DeviceSn: data.Sn,
  116. TotalActivePower: data.TotalPower,
  117. UploadTime: time.Now().Format("2006-01-02 15:04:05"),
  118. }.CreateInductanceDetails()
  119. case protocol.TopicSwitchStates:
  120. var data common.SwitchData
  121. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  122. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  123. continue
  124. }
  125. if data.TurnOf == 1 {
  126. dao.UpdateRelayState(data.Sn, data.Way, true)
  127. } else {
  128. dao.UpdateRelayState(data.Sn, data.Way, false)
  129. }
  130. case protocol.TopicLampBrightNess:
  131. var data common.LampData
  132. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  133. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  134. continue
  135. }
  136. if data.Way == 1 {
  137. dao.UpdateTunnelLamp1(sn, data.BrightNess)
  138. } else {
  139. dao.UpdateTunnelLamp2(sn, data.BrightNess)
  140. }
  141. }
  142. }
  143. }
  144. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  145. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  146. }
  147. func (o *MqttHandler) GetTopic(sn, protocol string) string {
  148. return fmt.Sprintf("smart_tunnel/%s/%s", sn, protocol)
  149. }
  150. // ParseTopic 获取设备SN, topic
  151. // "mini/*****/switch_control/ack"
  152. func ParseTopic(topic string) (sn string, operate string, err error) {
  153. strList := strings.Split(topic, "/")
  154. sn = strList[1]
  155. operate = strList[2]
  156. if len(strList) != 3 {
  157. return "", "", errors.New("不支持的topic")
  158. }
  159. return
  160. }