package main import ( "fmt" "github.com/jinzhu/gorm" "runtime" "runtime/debug" "strconv" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) const ( CableStatusNormal = iota //电缆状态 正常 CableStatusBeStolen //电缆状态 被盗 CableStatusOpened //电缆状态 被打开 CableStatusBeStolenAndOpened //电缆状态 被盗、被打开 ) const ( CableStatusNormalStr = "正常" //电缆状态 正常 CableStatusBeStolenStr = "被盗" //电缆状态 被盗 CableStatusOpenedStr = "被打开" //电缆状态 被打开 CableStatusBeStolenAndOpenedStr = "被盗、被打开" //电缆状态 被盗、被打开 ) const cableGuardianDataPrefix = "cable_guardian_data_%s_%s_%d" // 电缆防盗 mqtt消息处理 var _cableGuardianHandlerOnce sync.Once var _cableGuardianHandlerSingle *cableGuardianHandler func GetCableGuardianHandler() *cableGuardianHandler { _cableGuardianHandlerOnce.Do(func() { _cableGuardianHandlerSingle = &cableGuardianHandler{ queue: util.NewQueue(10000), } }) return _cableGuardianHandlerSingle } type cableGuardianHandler struct { queue *util.MlQueue } func (o *cableGuardianHandler) SubscribeTopics() { //电缆防盗 GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CableGuardian, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData) } func (o *cableGuardianHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("cableGuardianHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *cableGuardianHandler) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("cableGuardianHandler.Handler:%v发生异常:%s", args, string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } _, _, DID, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_MODBUS_DATA: var ret protocol.Pack_UploadData if err := ret.DeCode(m.PayloadString()); err == nil { if ret.Data.State == protocol.FAILED { logrus.Warningf("电缆防盗数据不正确 %+v", ret) continue } t, _ := util.MlParseTime(ret.Time) for id, value := range ret.Data.Data { key := fmt.Sprintf(cableGuardianDataPrefix, ret.Gid, DID, id) tId := strconv.Itoa(int(id)) old := getCableData(key) if value != old { cableGuardianStatus := &models.CableGuardianStatus{ GID: ret.Gid, DID: DID, TerminalID: tId, Status: int(value), } err := cableGuardianStatus.Get() if err != nil { if !gorm.IsRecordNotFoundError(err) { logrus.Warnf("CableGuardianStatus get fail = %v", err) continue } gateway := models.Gateway{ID: ret.Gid} err = gateway.Get() if err != nil { logrus.Warnf("CableGuardianStatus get gateway fail = %v", err) continue } cableGuardianStatus.GatewayName = gateway.Name } if cableGuardianStatus.ID > 0 { cableGuardianStatus.UpdateAt = t cableGuardianStatus.CreatedAt = t cableGuardianStatus.Status = int(value) err = cableGuardianStatus.Update() } else { cableGuardianStatus.CreatedAt = t cableGuardianStatus.UpdateAt = t err = cableGuardianStatus.Save() } cacheCableData(key, value) if old != -1 { err = sendSms([]string{cableGuardianStatus.GatewayName, tId, getSmsStr(value)}) } } } } default: logrus.Warnf("cableGuardianHandler.Handler:收到暂不支持的主题:%s", topic) } } } func getSmsStr(status float64) string { switch status { case CableStatusNormal: return CableStatusNormalStr case CableStatusBeStolen: return CableStatusBeStolenStr case CableStatusOpened: return CableStatusOpenedStr case CableStatusBeStolenAndOpened: return CableStatusBeStolenAndOpenedStr } return "" } // 缓存最新数据到redis func cacheCableData(key string, data float64) { if err := redisCltRawData.Set(key, data, 0).Err(); err != nil { logrus.Errorf("cacheCableData err = ", err.Error()) } } // 获取缓存的redis数据 func getCableData(key string) float64 { var value float64 if err := redisCltRawData.Get(key).Scan(&value); err != nil { logrus.Warningf("getCableData err = %s", err.Error()) return -1 } return value }