cltledmgr.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package main
  2. import (
  3. "github.com/sirupsen/logrus"
  4. "lc/common/models"
  5. "runtime"
  6. "runtime/debug"
  7. "sync"
  8. "time"
  9. "lc/common/mqtt"
  10. "lc/common/protocol"
  11. "lc/common/util"
  12. )
  13. var _CltLedMgrOnce sync.Once
  14. var _CltLedMgrSingle *CltLedMgr
  15. func GetCltLedMgr() *CltLedMgr {
  16. _CltLedMgrOnce.Do(func() {
  17. _CltLedMgrSingle = &CltLedMgr{
  18. queue: util.NewQueue(1000),
  19. mapCltledState: make(map[string]*StateInfo),
  20. }
  21. })
  22. return _CltLedMgrSingle
  23. }
  24. type CltLedMgr struct {
  25. queue *util.MlQueue
  26. mapCltledState map[string]*StateInfo //0在线,1离线
  27. }
  28. func (o *CltLedMgr) SubscribeTopics() {
  29. GetMQTTMgr().Subscribe(GetCltledTopic(protocol.TP_LED_DATA), mqtt.AtMostOnce, o.HandlerData)
  30. GetMQTTMgr().Subscribe(GetCltledTopic(protocol.TP_LED_STATE), mqtt.AtMostOnce, o.HandlerData)
  31. }
  32. func (o *CltLedMgr) HandlerData(m mqtt.Message) {
  33. for {
  34. ok, cnt := o.queue.Put(&m)
  35. if ok {
  36. break
  37. } else {
  38. logrus.Errorf("CltLedMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  39. runtime.Gosched()
  40. }
  41. }
  42. }
  43. func (o *CltLedMgr) Handler(args ...interface{}) interface{} {
  44. defer func() {
  45. if err := recover(); err != nil {
  46. gopool.Add(o.Handler, args)
  47. logrus.Errorf("CltLedMgr.Handler发生异常:%v", err)
  48. logrus.Errorf("CltLedMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
  49. }
  50. }()
  51. exit := false
  52. timer := time.NewTicker(1 * time.Minute)
  53. for {
  54. select {
  55. case <-timer.C: //每隔1分钟执行一次
  56. //状态,防止无数据状态不更新
  57. o.UpdateState()
  58. default:
  59. if o.handleQueue() == 0 {
  60. if exit {
  61. return 0
  62. }
  63. time.Sleep(100 * time.Millisecond)
  64. }
  65. }
  66. }
  67. }
  68. func (o *CltLedMgr) handleQueue() uint32 {
  69. msg, ok, quantity := o.queue.Get()
  70. if !ok {
  71. return quantity
  72. } else if quantity > 1000 {
  73. logrus.Warnf("CltLedMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  74. }
  75. m, ok := msg.(*mqtt.Message)
  76. if !ok {
  77. return quantity
  78. }
  79. //tenant, _, sn, topic, _, err := ParseTopicCltLed(m.Topic())
  80. //fmt.Printf("Topic=%v \ttenant = %v sn=%v topic=%v \n", m.Topic(), tenant, sn, topic)
  81. _, _, _, topic, _, err := ParseTopicCltLed(m.Topic())
  82. if err != nil {
  83. return quantity
  84. }
  85. switch topic {
  86. case protocol.TP_LED_STATE: //在线状态
  87. o.HandlerState(m)
  88. case protocol.TP_LED_DATA: //详情信息
  89. o.HandlerLedData(m)
  90. }
  91. return quantity
  92. }
  93. func (o *CltLedMgr) UpdateState() {
  94. t := util.MlNow()
  95. for k, v := range o.mapCltledState {
  96. if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的
  97. GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t})
  98. cacheState(k, t.Format("2006-01-02 15:04:05"), 1)
  99. o.mapCltledState[k].State = 1
  100. o.mapCltledState[k].Time = t
  101. }
  102. }
  103. }
  104. func (o *CltLedMgr) HandlerState(m *mqtt.Message) {
  105. var obj protocol.Pack_LedState
  106. if err := obj.DeCode(m.PayloadString()); err != nil {
  107. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  108. return
  109. }
  110. //fmt.Printf("obj = %v \n", obj)
  111. t, err := util.MlParseTime(obj.Time)
  112. if err != nil {
  113. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  114. return
  115. }
  116. //0在线,1离线
  117. si, ok := o.mapCltledState[obj.Id]
  118. if !ok {
  119. t, s, err := getState(obj.Id)
  120. if err != nil {
  121. cacheState(obj.Id, obj.Time, obj.Data.State)
  122. o.mapCltledState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
  123. return
  124. } else {
  125. si = &StateInfo{Time: t, State: s}
  126. o.mapCltledState[obj.Id] = si
  127. }
  128. }
  129. if si.State != obj.Data.State {
  130. if obj.Data.State == 1 { //在线到离线
  131. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
  132. } else { //离线到在线
  133. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
  134. }
  135. }
  136. cacheState(obj.Id, obj.Time, obj.Data.State)
  137. o.mapCltledState[obj.Id].State = obj.Data.State
  138. o.mapCltledState[obj.Id].Time = t
  139. }
  140. func (o *CltLedMgr) HandlerLedData(m *mqtt.Message) {
  141. var obj protocol.Pack_LedCltledData
  142. if err := obj.DeCode(m.PayloadString()); err != nil {
  143. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  144. return
  145. }
  146. marshalToString, err := json.MarshalToString(obj.Data)
  147. if err != nil {
  148. logrus.Errorf("解析数据1出错%v", err.Error())
  149. return
  150. }
  151. dataMap := make(map[string]interface{})
  152. err = json.Unmarshal([]byte(marshalToString), &dataMap)
  153. if err != nil {
  154. logrus.Errorf("解析数2据出错%v", err.Error())
  155. return
  156. }
  157. t, err := util.MlParseTime(obj.Time)
  158. if err != nil {
  159. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  160. return
  161. }
  162. cacheLedData(obj.Id, t, dataMap)
  163. }