||
- package main
- import (
- "context"
- "runtime"
- "runtime/debug"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _YMLampControllerMgrOnce sync.Once
- var _YMLampControllerMgrSingle *YMLampControllerMgr
- func GetYMLampControllerMgr() *YMLampControllerMgr {
- _YMLampControllerMgrOnce.Do(func() {
- ctx, cancel := context.WithCancel(context.Background())
- _YMLampControllerMgrSingle = &YMLampControllerMgr{
- queue: util.NewQueue(100),
- mapYMLampController: make(map[string]*YMLampController),
- ctx: ctx,
- cancel: cancel,
- }
- })
- return _YMLampControllerMgrSingle
- }
- type YMLampControllerMgr struct {
- queue *util.MlQueue
- mapYMLampController map[string]*YMLampController
- ctx context.Context
- cancel context.CancelFunc
- }
- func (o *YMLampControllerMgr) SubscribeTopics() {
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_DATA), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_ALARM), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_SWITCH_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *YMLampControllerMgr) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("YMLampControllerMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *YMLampControllerMgr) Stop() {
- o.cancel()
- }
- func (o *YMLampControllerMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("YMLampControllerMgr.Handler发生异常:%v", err)
- logrus.Errorf("YMLampControllerMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
- }
- }()
- exit := false
- timer := time.NewTicker(1 * time.Minute)
- //每天15点半同步日出日落时间
- var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(10*time.Hour + 30*time.Minute)
- for {
- select {
- case <-o.ctx.Done():
- logrus.Error("YMLampControllerMgr.HandleQueue即将退出,原因:", o.ctx.Err())
- exit = true
- case <-timer.C: //每隔1分钟执行一次
- //更新灯控状态,防止无数据状态不更新
- o.UpdateLampControllerState()
- //同步日出日落时间
- if util.MlNow().After(SyncSunset) {
- if err := o.SyncSunset(); err == nil {
- SyncSunset = SyncSunset.AddDate(0, 0, 1)
- }
- }
- default:
- if o.handleQueue() == 0 {
- if exit {
- return 0
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- }
- }
- func (o *YMLampControllerMgr) handleQueue() uint32 {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- return quantity
- } else if quantity > 1000 {
- logrus.Warnf("YMLampControllerMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- return quantity
- }
- Tenant, _, DID, topic, err := ParseTopic(m.Topic())
- if err != nil {
- return quantity
- }
- pymlc, ok := o.mapYMLampController[DID]
- if !ok {
- pymlc = &YMLampController{}
- pymlc.Set(Tenant, DID)
- o.mapYMLampController[DID] = pymlc
- }
- switch topic {
- case protocol.TP_YM_DATA:
- o.handleDATA(pymlc, m)
- case protocol.TP_YM_ALARM:
- o.handleALARM(pymlc, m)
- case protocol.TP_YM_SET_SWITCH_ACK, protocol.TP_YM_SET_ONOFFTIME_ACK:
- o.handleACK(m)
- }
- return quantity
- }
- func (o *YMLampControllerMgr) handleDATA(lp *YMLampController, m *mqtt.Message) {
- var obj protocol.Pack_CHZB_UploadData
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- for _, v := range obj.Data.Data {
- lp.HandleData(obj.Gid, obj.Data.TID, t, v)
- }
- }
- func (o *YMLampControllerMgr) handleALARM(lp *YMLampController, m *mqtt.Message) {
- var obj protocol.Pack_CHZB_LampAlarm
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- lp.HandleAlarm(obj.Data)
- }
- func (o *YMLampControllerMgr) handleACK(m *mqtt.Message) {
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{ID: obj.Seq, State: 1, Resp: obj.Data.Error}
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到设备[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- // SyncSunset 统一更新裕明485灯控的日出日落时间
- func (o *YMLampControllerMgr) SyncSunset() error {
- arr, err := models.GetYm485Lampstrategy(nil)
- if err != nil {
- logrus.Errorf("从数据库读取设置为日出日落时间的485灯控发生错误:%s", err.Error())
- return err
- }
- if len(arr) == 0 {
- return nil
- }
- //分别计算日出日落
- mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
- for _, v := range arr {
- if _, ok := mapTime[v.Strategy]; ok {
- //已计算日出日落时间的,不再重复计算
- continue
- }
- var oot protocol.CHZB_OnOffTime
- var ls []models.LampStrategy
- if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
- oot.Brightness = uint8(ls[0].Brightness)
- }
- //计算时间
- if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
- onHour, _ := strconv.Atoi(strings.Split(set, ":")[0])
- onMinute, _ := strconv.Atoi(strings.Split(set, ":")[1])
- offHour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
- offMinute, _ := strconv.Atoi(strings.Split(rise, ":")[1])
- oot.OnHour = uint8(onHour)
- oot.OnMinite = uint8(onMinute)
- oot.OffHour = uint8(offHour)
- oot.OffMinite = uint8(offMinute)
- }
- mapTime[v.Strategy] = &oot
- }
- //发布mqtt消息
- for _, v := range arr {
- if oot, ok := mapTime[v.Strategy]; ok {
- var obj protocol.Pack_SetOnOffTime
- seq := GetNextSeq()
- if str, err := obj.EnCode(v.ID, v.GID, seq, nil, []protocol.CHZB_OnOffTime{*oot}); err == nil {
- topic := GetTopic(v.Tenant, protocol.DT_LAMPCONTROLLER, v.ID, protocol.TP_YM_SET_ONOFFTIME)
- err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
- if err != nil {
- logrus.Errorf("SyncSunset:对灯控[%s]发布日出日落消息错误:%s", v.ID, err.Error())
- }
- var msg string
- if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
- msg = string(msg0)
- } else {
- msg = str
- }
- odb := models.DeviceCmdRecord{
- ID: seq,
- GID: v.GID,
- DID: v.ID,
- Topic: topic,
- Message: msg,
- State: 0,
- }
- if err := models.G_db.Create(&odb).Error; err != nil {
- logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库错误:%s", v.ID, err.Error())
- } else {
- logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库成功", v.ID)
- }
- }
- }
- }
- return nil
- }
- func (o *YMLampControllerMgr) UpdateLampControllerState() {
- for _, v := range o.mapYMLampController {
- v.UpdateState()
- }
- }
|