cableGuardianHandler.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/jinzhu/gorm"
  5. "runtime"
  6. "runtime/debug"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/sirupsen/logrus"
  11. "lc/common/models"
  12. "lc/common/mqtt"
  13. "lc/common/protocol"
  14. "lc/common/util"
  15. )
  16. const (
  17. CableStatusNormal = iota //电缆状态 正常
  18. CableStatusBeStolen //电缆状态 被盗
  19. CableStatusOpened //电缆状态 被打开
  20. CableStatusBeStolenAndOpened //电缆状态 被盗、被打开
  21. )
  22. const (
  23. CableStatusNormalStr = "正常" //电缆状态 正常
  24. CableStatusBeStolenStr = "被盗" //电缆状态 被盗
  25. CableStatusOpenedStr = "被打开" //电缆状态 被打开
  26. CableStatusBeStolenAndOpenedStr = "被盗、被打开" //电缆状态 被盗、被打开
  27. )
  28. const cableGuardianDataPrefix = "cable_guardian_data_%s_%s_%d"
  29. // 电缆防盗 mqtt消息处理
  30. var _cableGuardianHandlerOnce sync.Once
  31. var _cableGuardianHandlerSingle *cableGuardianHandler
  32. func GetCableGuardianHandler() *cableGuardianHandler {
  33. _cableGuardianHandlerOnce.Do(func() {
  34. _cableGuardianHandlerSingle = &cableGuardianHandler{
  35. queue: util.NewQueue(10000),
  36. }
  37. })
  38. return _cableGuardianHandlerSingle
  39. }
  40. type cableGuardianHandler struct {
  41. queue *util.MlQueue
  42. }
  43. func (o *cableGuardianHandler) SubscribeTopics() {
  44. //电缆防盗
  45. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CableGuardian, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
  46. }
  47. func (o *cableGuardianHandler) HandlerData(m mqtt.Message) {
  48. for {
  49. ok, cnt := o.queue.Put(&m)
  50. if ok {
  51. break
  52. } else {
  53. logrus.Errorf("cableGuardianHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  54. runtime.Gosched()
  55. }
  56. }
  57. }
  58. func (o *cableGuardianHandler) Handler(args ...interface{}) interface{} {
  59. defer func() {
  60. if err := recover(); err != nil {
  61. gopool.Add(o.Handler, args)
  62. logrus.Errorf("cableGuardianHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
  63. }
  64. }()
  65. for {
  66. msg, ok, quantity := o.queue.Get()
  67. if !ok {
  68. time.Sleep(10 * time.Millisecond)
  69. continue
  70. } else if quantity > 1000 {
  71. logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  72. }
  73. m, ok := msg.(*mqtt.Message)
  74. if !ok {
  75. continue
  76. }
  77. _, _, DID, topic, err := ParseTopic(m.Topic())
  78. if err != nil {
  79. continue
  80. }
  81. switch topic {
  82. case protocol.TP_MODBUS_DATA:
  83. var ret protocol.Pack_UploadData
  84. if err := ret.DeCode(m.PayloadString()); err == nil {
  85. if ret.Data.State == protocol.FAILED {
  86. logrus.Warningf("电缆防盗数据不正确 %+v", ret)
  87. continue
  88. }
  89. t, _ := util.MlParseTime(ret.Time)
  90. for id, value := range ret.Data.Data {
  91. key := fmt.Sprintf(cableGuardianDataPrefix, ret.Gid, DID, id)
  92. tId := strconv.Itoa(int(id))
  93. old := getCableData(key)
  94. if value != old {
  95. cableGuardianStatus := &models.CableGuardianStatus{
  96. GID: ret.Gid,
  97. DID: DID,
  98. TerminalID: tId,
  99. Status: int(value),
  100. }
  101. err := cableGuardianStatus.Get()
  102. if err != nil {
  103. if !gorm.IsRecordNotFoundError(err) {
  104. logrus.Warnf("CableGuardianStatus get fail = %v", err)
  105. continue
  106. }
  107. gateway := models.Gateway{ID: ret.Gid}
  108. err = gateway.Get()
  109. if err != nil {
  110. logrus.Warnf("CableGuardianStatus get gateway fail = %v", err)
  111. continue
  112. }
  113. cableGuardianStatus.GatewayName = gateway.Name
  114. }
  115. if cableGuardianStatus.ID > 0 {
  116. cableGuardianStatus.UpdateAt = t
  117. cableGuardianStatus.CreatedAt = t
  118. cableGuardianStatus.Status = int(value)
  119. err = cableGuardianStatus.Update()
  120. } else {
  121. cableGuardianStatus.CreatedAt = t
  122. cableGuardianStatus.UpdateAt = t
  123. err = cableGuardianStatus.Save()
  124. }
  125. cacheCableData(key, value)
  126. if old != -1 {
  127. err = sendSms([]string{cableGuardianStatus.GatewayName, tId, getSmsStr(value)})
  128. }
  129. }
  130. }
  131. }
  132. default:
  133. logrus.Warnf("cableGuardianHandler.Handler:收到暂不支持的主题:%s", topic)
  134. }
  135. }
  136. }
  137. func getSmsStr(status float64) string {
  138. switch status {
  139. case CableStatusNormal:
  140. return CableStatusNormalStr
  141. case CableStatusBeStolen:
  142. return CableStatusBeStolenStr
  143. case CableStatusOpened:
  144. return CableStatusOpenedStr
  145. case CableStatusBeStolenAndOpened:
  146. return CableStatusBeStolenAndOpenedStr
  147. }
  148. return ""
  149. }
  150. // 缓存最新数据到redis
  151. func cacheCableData(key string, data float64) {
  152. if err := redisCltRawData.Set(key, data, 0).Err(); err != nil {
  153. logrus.Errorf("cacheCableData err = ", err.Error())
  154. }
  155. }
  156. // 获取缓存的redis数据
  157. func getCableData(key string) float64 {
  158. var value float64
  159. if err := redisCltRawData.Get(key).Scan(&value); err != nil {
  160. logrus.Warningf("getCableData err = %s", err.Error())
  161. return -1
  162. }
  163. return value
  164. }