| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- package main
- import (
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "github.com/go-redis/redis/v7"
- jsoniter "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _modbusDeviceHandlerOnce sync.Once
- var _modbusDeviceHandlerSingle *ModbusDeviceHandler
- func GetModbusDeviceHandler() *ModbusDeviceHandler {
- _modbusDeviceHandlerOnce.Do(func() {
- _modbusDeviceHandlerSingle = &ModbusDeviceHandler{
- queue: util.NewQueue(10000),
- }
- })
- return _modbusDeviceHandlerSingle
- }
- type ModbusDeviceHandler struct {
- queue *util.MlQueue
- }
- func (o *ModbusDeviceHandler) SubscribeTopics() {
- //环境传感器
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
- //液位计
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
- //路况传感器
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *ModbusDeviceHandler) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("ModbusDeviceHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *ModbusDeviceHandler) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("ModbusDeviceHandler.Handler发生异常:%s", string(debug.Stack()))
- }
- }()
- timer := time.NewTicker(1 * time.Minute)
- for {
- select {
- case <-timer.C: //每隔5分钟执行一次 状态更新
- mapMbDevData.Range(func(key, value interface{}) bool {
- p, ok := value.(*MbDevData)
- if ok {
- p.UpdateState()
- }
- return true
- })
- default:
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("ModbusDeviceHandler.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- Tenant, DevType, DID, topic, err := ParseTopic(m.Topic())
- if err != nil {
- continue
- }
- switch topic {
- case protocol.TP_MODBUS_DATA:
- var obj protocol.Pack_UploadData
- if err := obj.DeCode(m.PayloadString()); err == nil {
- var pMbDevData *MbDevData = nil
- mgr, ok := mapMbDevData.Load(obj.Id)
- if ok {
- pMbDevData = mgr.(*MbDevData)
- } else {
- pMbDevData = NewMbDevData(Tenant, DevType, obj.Gid, DID)
- mapMbDevData.Store(obj.Id, pMbDevData)
- }
- if pMbDevData != nil {
- pMbDevData.HandleData(&obj)
- }
- }
- case protocol.TP_MODBUS_CONTROL_ACK:
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err == nil {
- o := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: uint(obj.Data.State),
- Resp: obj.Data.Error,
- }
- if err := o.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- logrus.Debugf("ModbusDeviceHandler.Handler:收到网关[%s]发布的主题:%s,内容:%s", obj.Id, m.Topic(), m.PayloadString())
- }
- default:
- logrus.Warnf("ModbusDeviceHandler.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
- }
- const (
- DevStatusPrefix string = "dev_stat_"
- DevDataPrefix string = "dev_data_"
- ONLINE string = "online"
- TLast string = "tlast"
- TIME string = "time"
- )
- var json = jsoniter.ConfigFastest
- var mapMbDevData sync.Map
- type MbDevData struct {
- Lock sync.Mutex
- Tenant string //基础数据,租户ID
- Devtype string //基础数据,设备类型
- GID string //基础数据,网关编码
- DID string //基础数据,设备ID
- TID uint16 //基础数据,物模型ID
- LastDataTime time.Time //实时数据,最新数据时间
- Data map[uint16]float64 //实时数据,最新数据
- LastStateTime time.Time //实时数据,最新状态时间
- State uint8 //实时数据,0在线,1离线
- ErrCnt uint //错误计数
- NextHourTime time.Time //下次保存实时数据时间
- }
- func NewMbDevData(tenant, devtype, gid, did string) *MbDevData {
- LastHour := protocol.ToBJTime(util.BeginningOfHour().Add(1 * time.Hour))
- return &MbDevData{
- Tenant: tenant,
- Devtype: devtype,
- DID: did,
- GID: gid,
- Data: make(map[uint16]float64),
- State: 0xff,
- NextHourTime: LastHour,
- }
- }
- func (o *MbDevData) handleStateChange(t time.Time) {
- //最新状态
- state := uint8(0)
- if o.ErrCnt >= 10 {
- state = 1
- } else if o.ErrCnt == 0 {
- state = 0
- } else {
- return
- }
- //状态处理
- if o.LastStateTime.IsZero() || o.State == 0xff {
- t0, s0, err := getState(o.DID)
- if err != nil {
- o.State = state
- o.LastStateTime = t
- return
- }
- o.State = s0
- o.LastStateTime = t0
- }
- if o.State == 0 && state == 1 { //在线->离线
- GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: t})
- } else if o.State == 1 && state == 0 { //离线->在线
- GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_ONLINE, Time: t})
- }
- o.State = state
- o.LastStateTime = t
- }
- func (o *MbDevData) checkSaveData() {
- if o.NextHourTime.Before(o.LastDataTime) {
- //判断小时是否一样,不一样则以数据时间为准
- if !util.New(o.NextHourTime).BeginningOfHour().Equal(util.New(o.LastDataTime).BeginningOfHour()) {
- o.NextHourTime = util.New(o.LastDataTime).BeginningOfHour()
- }
- if len(o.Data) > 0 {
- var datas []models.DeviceHourData
- for k, v := range o.Data {
- o := models.DeviceHourData{ID: o.DID, Sid: k, Val: float32(v), Time: o.NextHourTime, CreatedAt: time.Now()}
- datas = append(datas, o)
- }
- if err := models.MultiInsertDeviceHourData(datas); err != nil {
- logrus.Errorf("小时[%s]数据插入数据库失败:%s", o.NextHourTime.Format("2006-01-02 15:04:05"), err.Error())
- }
- }
- o.NextHourTime = o.NextHourTime.Add(time.Hour)
- o.Data = make(map[uint16]float64)
- }
- }
- func (o *MbDevData) UpdateState() {
- if o.LastStateTime.IsZero() {
- t0, s0, err := getState(o.DID)
- if err == nil {
- o.State = s0
- o.LastStateTime = t0
- } else {
- err := redisCltRawData.HGet(DeviceAlarmId, o.DID).Err()
- if err == redis.Nil {
- o.State = protocol.FAILED
- o.LastStateTime = util.MlNow()
- GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime})
- }
- }
- }
- if o.State == protocol.FAILED ||
- (!o.LastDataTime.IsZero() && util.MlNow().Sub(o.LastDataTime).Minutes() < OfflineInterval) {
- return
- }
- //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
- if o.State == protocol.SUCCESS {
- o.State = protocol.FAILED
- o.LastStateTime = util.MlNow()
- GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime})
- cacheState(o.DID, o.LastStateTime.Format("2006-01-02 15:04:05"), o.State)
- //检查是否要缓存数据
- o.checkSaveData()
- }
- }
- func (o *MbDevData) HandleData(data *protocol.Pack_UploadData) {
- t, err := util.MlParseTime(data.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", data.Time, err.Error())
- return
- }
- o.Lock.Lock()
- defer o.Lock.Unlock()
- o.GID = data.Gid
- o.TID = data.Data.Tid //基础数据,物模型ID
- if len(data.Data.Data) > 0 {
- for k, v := range data.Data.Data {
- o.Data[k] = v
- }
- o.LastDataTime = t
- cacheData(o.DID, t, data.Data.Data)
- o.ErrCnt = 0
- } else {
- o.ErrCnt++
- }
- //需要告警,则推入告警管理器
- bv := BizValue{ID: o.DID, Time: t, Tid: o.TID, Data: data.Data.Data}
- GetBizAlarmMgr().PushData(&bv)
- //先处理状态变化,再存入最新状态
- o.handleStateChange(t)
- cacheState(o.DID, data.Time, o.State)
- o.checkSaveData()
- }
|