package main import ( "context" "runtime" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _YMLampControllerMgrOnce sync.Once var _YMLampControllerMgrSingle *YMLampControllerMgr func GetYMLampControllerMgr() *YMLampControllerMgr { _YMLampControllerMgrOnce.Do(func() { ctx, cancel := context.WithCancel(context.Background()) _YMLampControllerMgrSingle = &YMLampControllerMgr{ queue: util.NewQueue(100), mapYMLampController: make(map[string]*YMLampController), ctx: ctx, cancel: cancel, } }) return _YMLampControllerMgrSingle } type YMLampControllerMgr struct { queue *util.MlQueue mapYMLampController map[string]*YMLampController ctx context.Context cancel context.CancelFunc } func (o *YMLampControllerMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_DATA), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_ALARM), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_SWITCH_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData) } func (o *YMLampControllerMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("YMLampControllerMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *YMLampControllerMgr) Stop() { o.cancel() } func (o *YMLampControllerMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("YMLampControllerMgr.Handler发生异常:%v", err) logrus.Errorf("YMLampControllerMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack())) } }() exit := false timer := time.NewTicker(1 * time.Minute) //每天15点半同步日出日落时间 var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(10*time.Hour + 30*time.Minute) for { select { case <-o.ctx.Done(): logrus.Error("YMLampControllerMgr.HandleQueue即将退出,原因:", o.ctx.Err()) exit = true case <-timer.C: //每隔1分钟执行一次 //更新灯控状态,防止无数据状态不更新 o.UpdateLampControllerState() //同步日出日落时间 if util.MlNow().After(SyncSunset) { if err := o.SyncSunset(); err == nil { SyncSunset = SyncSunset.AddDate(0, 0, 1) } } default: if o.handleQueue() == 0 { if exit { return 0 } time.Sleep(100 * time.Millisecond) } } } } func (o *YMLampControllerMgr) handleQueue() uint32 { msg, ok, quantity := o.queue.Get() if !ok { return quantity } else if quantity > 1000 { logrus.Warnf("YMLampControllerMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { return quantity } Tenant, _, DID, topic, err := ParseTopic(m.Topic()) if err != nil { return quantity } pymlc, ok := o.mapYMLampController[DID] if !ok { pymlc = &YMLampController{} pymlc.Set(Tenant, DID) o.mapYMLampController[DID] = pymlc } switch topic { case protocol.TP_YM_DATA: o.handleDATA(pymlc, m) case protocol.TP_YM_ALARM: o.handleALARM(pymlc, m) case protocol.TP_YM_SET_SWITCH_ACK, protocol.TP_YM_SET_ONOFFTIME_ACK: o.handleACK(m) } return quantity } func (o *YMLampControllerMgr) handleDATA(lp *YMLampController, m *mqtt.Message) { var obj protocol.Pack_CHZB_UploadData if err := obj.DeCode(m.PayloadString()); err != nil { return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } for _, v := range obj.Data.Data { lp.HandleData(obj.Gid, obj.Data.TID, t, v) } } func (o *YMLampControllerMgr) handleALARM(lp *YMLampController, m *mqtt.Message) { var obj protocol.Pack_CHZB_LampAlarm if err := obj.DeCode(m.PayloadString()); err != nil { return } lp.HandleAlarm(obj.Data) } func (o *YMLampControllerMgr) handleACK(m *mqtt.Message) { var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ID: obj.Seq, State: 1, Resp: obj.Data.Error} if err := oo.Update(); err != nil { logrus.Errorf("收到设备[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } // SyncSunset 统一更新裕明485灯控的日出日落时间 func (o *YMLampControllerMgr) SyncSunset() error { arr, err := models.GetYm485Lampstrategy(nil) if err != nil { logrus.Errorf("从数据库读取设置为日出日落时间的485灯控发生错误:%s", err.Error()) return err } if len(arr) == 0 { return nil } //分别计算日出日落 mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间 for _, v := range arr { if _, ok := mapTime[v.Strategy]; ok { //已计算日出日落时间的,不再重复计算 continue } var oot protocol.CHZB_OnOffTime var ls []models.LampStrategy if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 { oot.Brightness = uint8(ls[0].Brightness) } //计算时间 if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil { onHour, _ := strconv.Atoi(strings.Split(set, ":")[0]) onMinute, _ := strconv.Atoi(strings.Split(set, ":")[1]) offHour, _ := strconv.Atoi(strings.Split(rise, ":")[0]) offMinute, _ := strconv.Atoi(strings.Split(rise, ":")[1]) oot.OnHour = uint8(onHour) oot.OnMinite = uint8(onMinute) oot.OffHour = uint8(offHour) oot.OffMinite = uint8(offMinute) } mapTime[v.Strategy] = &oot } //发布mqtt消息 for _, v := range arr { if oot, ok := mapTime[v.Strategy]; ok { var obj protocol.Pack_SetOnOffTime seq := GetNextSeq() if str, err := obj.EnCode(v.ID, v.GID, seq, nil, []protocol.CHZB_OnOffTime{*oot}); err == nil { topic := GetTopic(v.Tenant, protocol.DT_LAMPCONTROLLER, v.ID, protocol.TP_YM_SET_ONOFFTIME) err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce) if err != nil { logrus.Errorf("SyncSunset:对灯控[%s]发布日出日落消息错误:%s", v.ID, err.Error()) } var msg string if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil { msg = string(msg0) } else { msg = str } odb := models.DeviceCmdRecord{ ID: seq, GID: v.GID, DID: v.ID, Topic: topic, Message: msg, State: 0, } if err := models.G_db.Create(&odb).Error; err != nil { logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库错误:%s", v.ID, err.Error()) } else { logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库成功", v.ID) } } } } return nil } func (o *YMLampControllerMgr) UpdateLampControllerState() { for _, v := range o.mapYMLampController { v.UpdateState() } }