mbdevhandler.go 8.4 KB


  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "github.com/go-redis/redis/v7"
  8. jsoniter "github.com/json-iterator/go"
  9. "github.com/sirupsen/logrus"
  10. "lc/common/models"
  11. "lc/common/mqtt"
  12. "lc/common/protocol"
  13. "lc/common/util"
  14. )
  15. var _modbusDeviceHandlerOnce sync.Once
  16. var _modbusDeviceHandlerSingle *ModbusDeviceHandler
  17. func GetModbusDeviceHandler() *ModbusDeviceHandler {
  18. _modbusDeviceHandlerOnce.Do(func() {
  19. _modbusDeviceHandlerSingle = &ModbusDeviceHandler{
  20. queue: util.NewQueue(10000),
  21. }
  22. })
  23. return _modbusDeviceHandlerSingle
  24. }
  25. type ModbusDeviceHandler struct {
  26. queue *util.MlQueue
  27. }
  28. func (o *ModbusDeviceHandler) SubscribeTopics() {
  29. //环境传感器
  30. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
  31. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
  32. //液位计
  33. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
  34. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
  35. //路况传感器
  36. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
  37. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
  38. }
  39. func (o *ModbusDeviceHandler) HandlerData(m mqtt.Message) {
  40. for {
  41. ok, cnt := o.queue.Put(&m)
  42. if ok {
  43. break
  44. } else {
  45. logrus.Errorf("ModbusDeviceHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  46. runtime.Gosched()
  47. }
  48. }
  49. }
  50. func (o *ModbusDeviceHandler) Handler(args ...interface{}) interface{} {
  51. defer func() {
  52. if err := recover(); err != nil {
  53. gopool.Add(o.Handler, args)
  54. logrus.Errorf("ModbusDeviceHandler.Handler发生异常:%s", string(debug.Stack()))
  55. }
  56. }()
  57. timer := time.NewTicker(1 * time.Minute)
  58. for {
  59. select {
  60. case <-timer.C: //每隔5分钟执行一次 状态更新
  61. mapMbDevData.Range(func(key, value interface{}) bool {
  62. p, ok := value.(*MbDevData)
  63. if ok {
  64. p.UpdateState()
  65. }
  66. return true
  67. })
  68. default:
  69. msg, ok, quantity := o.queue.Get()
  70. if !ok {
  71. time.Sleep(10 * time.Millisecond)
  72. continue
  73. } else if quantity > 1000 {
  74. logrus.Warnf("ModbusDeviceHandler.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  75. }
  76. m, ok := msg.(*mqtt.Message)
  77. if !ok {
  78. continue
  79. }
  80. Tenant, DevType, DID, topic, err := ParseTopic(m.Topic())
  81. if err != nil {
  82. continue
  83. }
  84. switch topic {
  85. case protocol.TP_MODBUS_DATA:
  86. var obj protocol.Pack_UploadData
  87. if err := obj.DeCode(m.PayloadString()); err == nil {
  88. var pMbDevData *MbDevData = nil
  89. mgr, ok := mapMbDevData.Load(obj.Id)
  90. if ok {
  91. pMbDevData = mgr.(*MbDevData)
  92. } else {
  93. pMbDevData = NewMbDevData(Tenant, DevType, obj.Gid, DID)
  94. mapMbDevData.Store(obj.Id, pMbDevData)
  95. }
  96. if pMbDevData != nil {
  97. pMbDevData.HandleData(&obj)
  98. }
  99. }
  100. case protocol.TP_MODBUS_CONTROL_ACK:
  101. var obj protocol.Pack_Ack
  102. if err := obj.DeCode(m.PayloadString()); err == nil {
  103. o := models.DeviceCmdRecord{
  104. ID: obj.Seq,
  105. State: uint(obj.Data.State),
  106. Resp: obj.Data.Error,
  107. }
  108. if err := o.Update(); err != nil {
  109. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error())
  110. }
  111. logrus.Debugf("ModbusDeviceHandler.Handler:收到网关[%s]发布的主题:%s,内容:%s", obj.Id, m.Topic(), m.PayloadString())
  112. }
  113. default:
  114. logrus.Warnf("ModbusDeviceHandler.Handler:收到暂不支持的主题:%s", topic)
  115. }
  116. }
  117. }
  118. }
  119. const (
  120. DevStatusPrefix string = "dev_stat_"
  121. DevDataPrefix string = "dev_data_"
  122. ONLINE string = "online"
  123. TLast string = "tlast"
  124. TIME string = "time"
  125. )
  126. var json = jsoniter.ConfigFastest
  127. var mapMbDevData sync.Map
  128. type MbDevData struct {
  129. Lock sync.Mutex
  130. Tenant string //基础数据,租户ID
  131. Devtype string //基础数据,设备类型
  132. GID string //基础数据,网关编码
  133. DID string //基础数据,设备ID
  134. TID uint16 //基础数据,物模型ID
  135. LastDataTime time.Time //实时数据,最新数据时间
  136. Data map[uint16]float64 //实时数据,最新数据
  137. LastStateTime time.Time //实时数据,最新状态时间
  138. State uint8 //实时数据,0在线,1离线
  139. ErrCnt uint //错误计数
  140. NextHourTime time.Time //下次保存实时数据时间
  141. }
  142. func NewMbDevData(tenant, devtype, gid, did string) *MbDevData {
  143. LastHour := protocol.ToBJTime(util.BeginningOfHour().Add(1 * time.Hour))
  144. return &MbDevData{
  145. Tenant: tenant,
  146. Devtype: devtype,
  147. DID: did,
  148. GID: gid,
  149. Data: make(map[uint16]float64),
  150. State: 0xff,
  151. NextHourTime: LastHour,
  152. }
  153. }
  154. func (o *MbDevData) handleStateChange(t time.Time) {
  155. //最新状态
  156. state := uint8(0)
  157. if o.ErrCnt >= 10 {
  158. state = 1
  159. } else if o.ErrCnt == 0 {
  160. state = 0
  161. } else {
  162. return
  163. }
  164. //状态处理
  165. if o.LastStateTime.IsZero() || o.State == 0xff {
  166. t0, s0, err := getState(o.DID)
  167. if err != nil {
  168. o.State = state
  169. o.LastStateTime = t
  170. return
  171. }
  172. o.State = s0
  173. o.LastStateTime = t0
  174. }
  175. if o.State == 0 && state == 1 { //在线->离线
  176. GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: t})
  177. } else if o.State == 1 && state == 0 { //离线->在线
  178. GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_ONLINE, Time: t})
  179. }
  180. o.State = state
  181. o.LastStateTime = t
  182. }
  183. func (o *MbDevData) checkSaveData() {
  184. if o.NextHourTime.Before(o.LastDataTime) {
  185. //判断小时是否一样,不一样则以数据时间为准
  186. if !util.New(o.NextHourTime).BeginningOfHour().Equal(util.New(o.LastDataTime).BeginningOfHour()) {
  187. o.NextHourTime = util.New(o.LastDataTime).BeginningOfHour()
  188. }
  189. if len(o.Data) > 0 {
  190. var datas []models.DeviceHourData
  191. for k, v := range o.Data {
  192. o := models.DeviceHourData{ID: o.DID, Sid: k, Val: float32(v), Time: o.NextHourTime, CreatedAt: time.Now()}
  193. datas = append(datas, o)
  194. }
  195. if err := models.MultiInsertDeviceHourData(datas); err != nil {
  196. logrus.Errorf("小时[%s]数据插入数据库失败:%s", o.NextHourTime.Format("2006-01-02 15:04:05"), err.Error())
  197. }
  198. }
  199. o.NextHourTime = o.NextHourTime.Add(time.Hour)
  200. o.Data = make(map[uint16]float64)
  201. }
  202. }
  203. func (o *MbDevData) UpdateState() {
  204. if o.LastStateTime.IsZero() {
  205. t0, s0, err := getState(o.DID)
  206. if err == nil {
  207. o.State = s0
  208. o.LastStateTime = t0
  209. } else {
  210. err := redisCltRawData.HGet(DeviceAlarmId, o.DID).Err()
  211. if err == redis.Nil {
  212. o.State = protocol.FAILED
  213. o.LastStateTime = util.MlNow()
  214. GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime})
  215. }
  216. }
  217. }
  218. if o.State == protocol.FAILED ||
  219. (!o.LastDataTime.IsZero() && util.MlNow().Sub(o.LastDataTime).Minutes() < OfflineInterval) {
  220. return
  221. }
  222. //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
  223. if o.State == protocol.SUCCESS {
  224. o.State = protocol.FAILED
  225. o.LastStateTime = util.MlNow()
  226. GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime})
  227. cacheState(o.DID, o.LastStateTime.Format("2006-01-02 15:04:05"), o.State)
  228. //检查是否要缓存数据
  229. o.checkSaveData()
  230. }
  231. }
  232. func (o *MbDevData) HandleData(data *protocol.Pack_UploadData) {
  233. t, err := util.MlParseTime(data.Time)
  234. if err != nil {
  235. logrus.Errorf("时间[%s]解析错误:%s", data.Time, err.Error())
  236. return
  237. }
  238. o.Lock.Lock()
  239. defer o.Lock.Unlock()
  240. o.GID = data.Gid
  241. o.TID = data.Data.Tid //基础数据,物模型ID
  242. if len(data.Data.Data) > 0 {
  243. for k, v := range data.Data.Data {
  244. o.Data[k] = v
  245. }
  246. o.LastDataTime = t
  247. cacheData(o.DID, t, data.Data.Data)
  248. o.ErrCnt = 0
  249. } else {
  250. o.ErrCnt++
  251. }
  252. //需要告警,则推入告警管理器
  253. bv := BizValue{ID: o.DID, Time: t, Tid: o.TID, Data: data.Data.Data}
  254. GetBizAlarmMgr().PushData(&bv)
  255. //先处理状态变化,再存入最新状态
  256. o.handleStateChange(t)
  257. cacheState(o.DID, data.Time, o.State)
  258. o.checkSaveData()
  259. }