package main import ( "runtime" "runtime/debug" "sync" "time" "github.com/go-redis/redis/v7" jsoniter "github.com/json-iterator/go" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _modbusDeviceHandlerOnce sync.Once var _modbusDeviceHandlerSingle *ModbusDeviceHandler func GetModbusDeviceHandler() *ModbusDeviceHandler { _modbusDeviceHandlerOnce.Do(func() { _modbusDeviceHandlerSingle = &ModbusDeviceHandler{ queue: util.NewQueue(10000), } }) return _modbusDeviceHandlerSingle } type ModbusDeviceHandler struct { queue *util.MlQueue } func (o *ModbusDeviceHandler) SubscribeTopics() { //环境传感器 GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ENVIRONMENT, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData) //液位计 GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LIQUID, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData) //路况传感器 GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_CONTROL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ROAD_COND, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData) } func (o *ModbusDeviceHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("ModbusDeviceHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *ModbusDeviceHandler) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("ModbusDeviceHandler.Handler发生异常:%s", string(debug.Stack())) } }() timer := time.NewTicker(1 * time.Minute) for { select { case <-timer.C: //每隔5分钟执行一次 状态更新 mapMbDevData.Range(func(key, value interface{}) bool { p, ok := value.(*MbDevData) if ok { p.UpdateState() } return true }) default: msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("ModbusDeviceHandler.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } Tenant, DevType, DID, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_MODBUS_DATA: var obj protocol.Pack_UploadData if err := obj.DeCode(m.PayloadString()); err == nil { var pMbDevData *MbDevData = nil mgr, ok := mapMbDevData.Load(obj.Id) if ok { pMbDevData = mgr.(*MbDevData) } else { pMbDevData = NewMbDevData(Tenant, DevType, obj.Gid, DID) mapMbDevData.Store(obj.Id, pMbDevData) } if pMbDevData != nil { pMbDevData.HandleData(&obj) } } case protocol.TP_MODBUS_CONTROL_ACK: var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err == nil { o := models.DeviceCmdRecord{ ID: obj.Seq, State: uint(obj.Data.State), Resp: obj.Data.Error, } if err := o.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } logrus.Debugf("ModbusDeviceHandler.Handler:收到网关[%s]发布的主题:%s,内容:%s", obj.Id, m.Topic(), m.PayloadString()) } default: logrus.Warnf("ModbusDeviceHandler.Handler:收到暂不支持的主题:%s", topic) } } } } const ( DevStatusPrefix string = "dev_stat_" DevDataPrefix string = "dev_data_" ONLINE string = "online" TLast string = "tlast" TIME string = "time" ) var json = jsoniter.ConfigFastest var mapMbDevData sync.Map type MbDevData struct { Lock sync.Mutex Tenant string //基础数据,租户ID Devtype string //基础数据,设备类型 GID string //基础数据,网关编码 DID string //基础数据,设备ID TID uint16 //基础数据,物模型ID LastDataTime time.Time //实时数据,最新数据时间 Data map[uint16]float64 //实时数据,最新数据 LastStateTime time.Time //实时数据,最新状态时间 State uint8 //实时数据,0在线,1离线 ErrCnt uint //错误计数 NextHourTime time.Time //下次保存实时数据时间 } func NewMbDevData(tenant, devtype, gid, did string) *MbDevData { LastHour := protocol.ToBJTime(util.BeginningOfHour().Add(1 * time.Hour)) return &MbDevData{ Tenant: tenant, Devtype: devtype, DID: did, GID: gid, Data: make(map[uint16]float64), State: 0xff, NextHourTime: LastHour, } } func (o *MbDevData) handleStateChange(t time.Time) { //最新状态 state := uint8(0) if o.ErrCnt >= 10 { state = 1 } else if o.ErrCnt == 0 { state = 0 } else { return } //状态处理 if o.LastStateTime.IsZero() || o.State == 0xff { t0, s0, err := getState(o.DID) if err != nil { o.State = state o.LastStateTime = t return } o.State = s0 o.LastStateTime = t0 } if o.State == 0 && state == 1 { //在线->离线 GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: t}) } else if o.State == 1 && state == 0 { //离线->在线 GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_ONLINE, Time: t}) } o.State = state o.LastStateTime = t } func (o *MbDevData) checkSaveData() { if o.NextHourTime.Before(o.LastDataTime) { //判断小时是否一样,不一样则以数据时间为准 if !util.New(o.NextHourTime).BeginningOfHour().Equal(util.New(o.LastDataTime).BeginningOfHour()) { o.NextHourTime = util.New(o.LastDataTime).BeginningOfHour() } if len(o.Data) > 0 { var datas []models.DeviceHourData for k, v := range o.Data { o := models.DeviceHourData{ID: o.DID, Sid: k, Val: float32(v), Time: o.NextHourTime, CreatedAt: time.Now()} datas = append(datas, o) } if err := models.MultiInsertDeviceHourData(datas); err != nil { logrus.Errorf("小时[%s]数据插入数据库失败:%s", o.NextHourTime.Format("2006-01-02 15:04:05"), err.Error()) } } o.NextHourTime = o.NextHourTime.Add(time.Hour) o.Data = make(map[uint16]float64) } } func (o *MbDevData) UpdateState() { if o.LastStateTime.IsZero() { t0, s0, err := getState(o.DID) if err == nil { o.State = s0 o.LastStateTime = t0 } else { err := redisCltRawData.HGet(DeviceAlarmId, o.DID).Err() if err == redis.Nil { o.State = protocol.FAILED o.LastStateTime = util.MlNow() GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime}) } } } if o.State == protocol.FAILED || (!o.LastDataTime.IsZero() && util.MlNow().Sub(o.LastDataTime).Minutes() < OfflineInterval) { return } //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态 if o.State == protocol.SUCCESS { o.State = protocol.FAILED o.LastStateTime = util.MlNow() GetEventMgr().PushEvent(&EventObject{ID: o.DID, EventType: models.ET_OFFLINE, Time: o.LastStateTime}) cacheState(o.DID, o.LastStateTime.Format("2006-01-02 15:04:05"), o.State) //检查是否要缓存数据 o.checkSaveData() } } func (o *MbDevData) HandleData(data *protocol.Pack_UploadData) { t, err := util.MlParseTime(data.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", data.Time, err.Error()) return } o.Lock.Lock() defer o.Lock.Unlock() o.GID = data.Gid o.TID = data.Data.Tid //基础数据,物模型ID if len(data.Data.Data) > 0 { for k, v := range data.Data.Data { o.Data[k] = v } o.LastDataTime = t cacheData(o.DID, t, data.Data.Data) o.ErrCnt = 0 } else { o.ErrCnt++ } //需要告警,则推入告警管理器 bv := BizValue{ID: o.DID, Time: t, Tid: o.TID, Data: data.Data.Data} GetBizAlarmMgr().PushData(&bv) //先处理状态变化,再存入最新状态 o.handleStateChange(t) cacheState(o.DID, data.Time, o.State) o.checkSaveData() }