mqtt_handle.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package service
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "runtime"
  7. "runtime/debug"
  8. "smart_tunnel_edge/mqtt"
  9. "smart_tunnel_edge/util/config"
  10. "smart_tunnel_edge/util/logger"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. func InitMqtt() {
  17. MqttService = GetHandler()
  18. MqttService.SubscribeTopics()
  19. go MqttService.Handler()
  20. }
  21. var MqttService *MqttHandler
  22. type MqttHandler struct {
  23. queue *mqtt.MlQueue
  24. }
  25. var _handlerOnce sync.Once
  26. var _handlerSingle *MqttHandler
  27. func GetHandler() *MqttHandler {
  28. _handlerOnce.Do(func() {
  29. _handlerSingle = &MqttHandler{
  30. queue: mqtt.NewQueue(10000),
  31. }
  32. })
  33. return _handlerSingle
  34. }
  35. func (o *MqttHandler) SubscribeTopics() {
  36. mqtt.GetMQTTMgr().Subscribe("smart_tunnel/#", mqtt.AtLeastOnce, o.HandlerData)
  37. }
  38. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  39. for {
  40. ok, cnt := o.queue.Put(&m)
  41. if ok {
  42. break
  43. } else {
  44. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  45. runtime.Gosched()
  46. }
  47. }
  48. }
  49. func (o *MqttHandler) Handler() interface{} {
  50. defer func() {
  51. if err := recover(); err != nil {
  52. go GetHandler().Handler()
  53. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  54. }
  55. }()
  56. for {
  57. msg, ok, quantity := o.queue.Get()
  58. if !ok {
  59. time.Sleep(10 * time.Millisecond)
  60. continue
  61. } else if quantity > 1000 {
  62. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  63. }
  64. m, ok := msg.(*mqtt.Message)
  65. if !ok {
  66. continue
  67. }
  68. tunnelId, topic, err := ParseTopic(m.Topic())
  69. if err != nil {
  70. logger.Logger.Errorf("parseTopic err")
  71. continue
  72. }
  73. if tunnelId != config.Instance().TunnelId {
  74. continue
  75. }
  76. switch topic {
  77. case TopicLampControl:
  78. data := m.PayloadString()
  79. data1 := data[0:1]
  80. data2 := data[1:]
  81. radarId, _ := strconv.Atoi(data1)
  82. brightNess, _ := strconv.Atoi(data2)
  83. SetLampBright(int8(radarId), brightNess)
  84. case TopicSwitchControl:
  85. data := m.PayloadString()
  86. data1 := data[0:1]
  87. data2 := data[1:2]
  88. data3 := data[2:3]
  89. radarId, _ := strconv.Atoi(data1)
  90. way, _ := strconv.Atoi(data2)
  91. turnOf, _ := strconv.Atoi(data3)
  92. SetSwitchRelay(int8(radarId), way, turnOf)
  93. case TopicTunnelTactics:
  94. // 修改策略
  95. num, _ := strconv.Atoi(m.PayloadString())
  96. updates := map[string]interface{}{
  97. "id": num,
  98. }
  99. if !updateConfigAndReload(updates) {
  100. continue
  101. }
  102. case TopicTunnelTiming:
  103. // 修改时控
  104. var data map[string]interface{}
  105. if err := json.Unmarshal([]byte(m.PayloadString()), &data); err != nil {
  106. logger.Logger.Errorf("Error unmarshalling JSON: %v", err)
  107. continue
  108. }
  109. updates := map[string]interface{}{
  110. "startTime": data["startTime"],
  111. "endTime": data["endTime"],
  112. }
  113. if !updateConfigAndReload(updates) {
  114. continue
  115. }
  116. }
  117. }
  118. }
  119. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  120. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  121. }
  122. func (o *MqttHandler) GetTopic(operation string) string {
  123. tunnelId := config.Instance().TunnelId
  124. return fmt.Sprintf("smart_tunnel/%s/%s", tunnelId, operation)
  125. }
  126. func ParseTopic(topic string) (sn string, operate string, err error) {
  127. strList := strings.Split(topic, "/")
  128. sn = strList[1]
  129. operate = strList[2]
  130. if len(strList) != 3 {
  131. return "", "", errors.New("不支持的topic")
  132. }
  133. return
  134. }
  135. func updateConfigAndReload(updates map[string]interface{}) bool {
  136. if err := config.UpdateYAMLConfig(updates); err != nil {
  137. logger.Logger.Errorf("修改yaml失败 err: %v", err)
  138. return false
  139. }
  140. config.ReloadConfig()
  141. return true
  142. }
  143. const (
  144. TopicGatherDataEnv = "gatherDataEnv"
  145. TopicGatherDataOpt = "gatherDataOpt"
  146. TopicTunnelTactics = "tunnelTactics"
  147. TopicTunnelTiming = "tunnelTiming"
  148. TopicLampControl = "lampControl"
  149. TopicSwitchControl = "switchControl"
  150. )