| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package main
- import (
- "fmt"
- "github.com/jinzhu/gorm"
- "runtime"
- "runtime/debug"
- "strconv"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- const (
- CableStatusNormal = iota //电缆状态 正常
- CableStatusBeStolen //电缆状态 被盗
- CableStatusOpened //电缆状态 被打开
- CableStatusBeStolenAndOpened //电缆状态 被盗、被打开
- )
- const (
- CableStatusNormalStr = "正常" //电缆状态 正常
- CableStatusBeStolenStr = "被盗" //电缆状态 被盗
- CableStatusOpenedStr = "被打开" //电缆状态 被打开
- CableStatusBeStolenAndOpenedStr = "被盗、被打开" //电缆状态 被盗、被打开
- )
- const cableGuardianDataPrefix = "cable_guardian_data_%s_%s_%d"
- // 电缆防盗 mqtt消息处理
- var _cableGuardianHandlerOnce sync.Once
- var _cableGuardianHandlerSingle *cableGuardianHandler
- func GetCableGuardianHandler() *cableGuardianHandler {
- _cableGuardianHandlerOnce.Do(func() {
- _cableGuardianHandlerSingle = &cableGuardianHandler{
- queue: util.NewQueue(10000),
- }
- })
- return _cableGuardianHandlerSingle
- }
- type cableGuardianHandler struct {
- queue *util.MlQueue
- }
- func (o *cableGuardianHandler) SubscribeTopics() {
- //电缆防盗
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CableGuardian, protocol.TP_MODBUS_DATA), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *cableGuardianHandler) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("cableGuardianHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *cableGuardianHandler) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("cableGuardianHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- _, _, DID, topic, err := ParseTopic(m.Topic())
- if err != nil {
- continue
- }
- switch topic {
- case protocol.TP_MODBUS_DATA:
- var ret protocol.Pack_UploadData
- if err := ret.DeCode(m.PayloadString()); err == nil {
- if ret.Data.State == protocol.FAILED {
- logrus.Warningf("电缆防盗数据不正确 %+v", ret)
- continue
- }
- t, _ := util.MlParseTime(ret.Time)
- for id, value := range ret.Data.Data {
- key := fmt.Sprintf(cableGuardianDataPrefix, ret.Gid, DID, id)
- tId := strconv.Itoa(int(id))
- old := getCableData(key)
- if value != old {
- cableGuardianStatus := &models.CableGuardianStatus{
- GID: ret.Gid,
- DID: DID,
- TerminalID: tId,
- Status: int(value),
- }
- err := cableGuardianStatus.Get()
- if err != nil {
- if !gorm.IsRecordNotFoundError(err) {
- logrus.Warnf("CableGuardianStatus get fail = %v", err)
- continue
- }
- gateway := models.Gateway{ID: ret.Gid}
- err = gateway.Get()
- if err != nil {
- logrus.Warnf("CableGuardianStatus get gateway fail = %v", err)
- continue
- }
- cableGuardianStatus.GatewayName = gateway.Name
- }
- if cableGuardianStatus.ID > 0 {
- cableGuardianStatus.UpdateAt = t
- cableGuardianStatus.CreatedAt = t
- cableGuardianStatus.Status = int(value)
- err = cableGuardianStatus.Update()
- } else {
- cableGuardianStatus.CreatedAt = t
- cableGuardianStatus.UpdateAt = t
- err = cableGuardianStatus.Save()
- }
- cacheCableData(key, value)
- if old != -1 {
- err = sendSms([]string{cableGuardianStatus.GatewayName, tId, getSmsStr(value)})
- }
- }
- }
- }
- default:
- logrus.Warnf("cableGuardianHandler.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
- func getSmsStr(status float64) string {
- switch status {
- case CableStatusNormal:
- return CableStatusNormalStr
- case CableStatusBeStolen:
- return CableStatusBeStolenStr
- case CableStatusOpened:
- return CableStatusOpenedStr
- case CableStatusBeStolenAndOpened:
- return CableStatusBeStolenAndOpenedStr
- }
- return ""
- }
- // 缓存最新数据到redis
- func cacheCableData(key string, data float64) {
- if err := redisCltRawData.Set(key, data, 0).Err(); err != nil {
- logrus.Errorf("cacheCableData err = ", err.Error())
- }
- }
- // 获取缓存的redis数据
- func getCableData(key string) float64 {
- var value float64
- if err := redisCltRawData.Get(key).Scan(&value); err != nil {
- logrus.Warningf("getCableData err = %s", err.Error())
- return -1
- }
- return value
- }
|