||
- package main
- import (
- "context"
- "runtime/debug"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- // 长河Zigbee 集控器
- type ChZigbeeConcentrator struct {
- ctx context.Context
- cancel context.CancelFunc
- lock sync.Mutex
- tenant string //租户
- gid string //基础数据,网关ID
- did string //基础数据,集控器设备编码
- tid uint16 //基础数据,集控器物模型ID
- mapLamp map[string]*ChZigbeeLampController //单灯数据,编号唯一
- state uint8 //实时数据,0在线,1离线
- lastStateTime time.Time //实时数据,最新状态时间
- lastDataTime time.Time //最新数据时间
- nextHourTime time.Time //实时数据
- queue *util.MlQueue //数据队列
- errCnt uint //错误计数
- mapTopicHandle map[string]func(m *mqtt.Message)
- }
- func NewChZigbeeConcentrator(tenant, did string) *ChZigbeeConcentrator {
- ctx, cancel := context.WithCancel(context.Background())
- chc := ChZigbeeConcentrator{
- ctx: ctx,
- cancel: cancel,
- tenant: tenant,
- did: did,
- mapLamp: make(map[string]*ChZigbeeLampController),
- queue: util.NewQueue(1000),
- mapTopicHandle: make(map[string]func(m *mqtt.Message)),
- }
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_DATA)] = chc.handleTpChZigbeeData
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_LAMP)] = chc.handleTpChZigbeeQLamp
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_WAITTIME_ACK)] = chc.handleTpChZigbeeSetWaittimeAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_SWITCH_ACK)] = chc.handleTpChZigbeeSetSwitchAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK)] = chc.handleTpChZigbeeSetRecoveryAutoAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_ONOFFTIME_ACK)] = chc.handleTpChZigbeeSetOnofftimeAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK)] = chc.handleTpChZigbeeQueryOnofftimeAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK)] = chc.handleTpChZigbeeSetUpdateLampAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_TIME_ACK)] = chc.handleTpChZigbeeQueryTimeAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_BROADCASTTIME_ACK)] = chc.handleTpChZigbeeSetBroadcastTimeAck
- chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_ALARM)] = chc.handleTpChZigbeeAlarm
- return &chc
- }
- func (o *ChZigbeeConcentrator) PutMessage(m *mqtt.Message) {
- o.queue.Put(m)
- }
- func (o *ChZigbeeConcentrator) Start() {
- go o.HandleQueue()
- }
- func (o *ChZigbeeConcentrator) Stop() {
- o.cancel()
- }
- func (o *ChZigbeeConcentrator) HandleQueue() {
- defer func() {
- if err := recover(); err != nil {
- logrus.Error("ChZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack()))
- go o.HandleQueue()
- }
- }()
- o.UpdateDID()
- var exit = false
- timer := time.NewTicker(1 * time.Minute)
- timer5 := time.NewTicker(5 * time.Minute)
- //每天15点半同步日出日落时间
- var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute)
- for {
- select {
- case <-o.ctx.Done():
- logrus.Error("ChZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err())
- exit = true
- case <-timer.C: //每隔1分钟执行一次
- o.UpdateState() //更新集控器状态,防止无数据状态不更新
- o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新
- case <-timer5.C: //每隔5分钟执行一次
- o.UpdateDID() //更新灯控设备编码
- //同步日出日落时间
- if util.MlNow().After(SyncSunset) {
- if err := o.SyncSunset(); err == nil {
- SyncSunset = SyncSunset.AddDate(0, 0, 1)
- }
- }
- default:
- //从队列钟获取指令执行
- if m, ok, quantity := o.queue.Get(); ok {
- if mm, ok := m.(*mqtt.Message); ok {
- if f, ok := o.mapTopicHandle[mm.Topic()]; ok {
- f(mm)
- } else {
- logrus.Error("ChZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic())
- }
- }
- } else if quantity == 0 {
- if exit {
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- }
- }
- func (o *ChZigbeeConcentrator) SyncSunset() error {
- arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did)
- if err != nil {
- logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error())
- return err
- }
- if len(arr) == 0 {
- return nil
- }
- //Strategy相同的一起发送,不同的分批发送
- mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
- mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码
- for _, v := range arr {
- mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number))
- if _, ok := mapTime[v.Strategy]; ok {
- //已计算日出日落时间的,不再重复计算
- continue
- }
- var oot protocol.CHZB_OnOffTime
- var ls []models.LampStrategy
- if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
- oot.Brightness = uint8(ls[0].Brightness)
- }
- //计算时间
- if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
- onhour, _ := strconv.Atoi(strings.Split(set, ":")[0])
- onminite, _ := strconv.Atoi(strings.Split(set, ":")[1])
- offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
- offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1])
- oot.OnHour = uint8(onhour)
- oot.OnMinite = uint8(onminite)
- oot.OffHour = uint8(offhour)
- oot.OffMinite = uint8(offminite)
- }
- mapTime[v.Strategy] = &oot
- }
- for k, v := range mapNumber {
- if oot, ok := mapTime[k]; ok {
- var obj protocol.Pack_SetOnOffTime
- seq := GetNextSeq()
- if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil {
- topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME)
- err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
- if err != nil {
- logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error())
- }
- var msg string
- if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
- msg = string(msg0)
- } else {
- msg = str
- }
- odb := models.DeviceCmdRecord{
- ID: seq,
- GID: o.gid,
- DID: o.did,
- Topic: topic,
- Message: msg,
- State: 0,
- }
- if err := models.G_db.Create(&odb).Error; err != nil {
- logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error())
- } else {
- logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v)
- }
- }
- }
- }
- return nil
- }
- func (o *ChZigbeeConcentrator) UpdateDID() {
- if arr, err := models.GetLampControllerByConcentrator(o.did); err == nil {
- maplamps := make(map[uint32]string, len(arr))
- for _, v := range arr {
- if ld, ok := o.mapLamp[v.ID]; ok {
- ld.SetDID(v.ID, uint32(v.Number))
- } else {
- var ld ChZigbeeLampController
- ld.SetDID(v.ID, uint32(v.Number))
- o.mapLamp[v.ID] = &ld
- }
- if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用
- maplamps[uint32(v.Number)] = v.ID
- }
- o.gid = v.GID
- }
- if len(maplamps) > 0 {
- var ret protocol.Pack_CHZB_LampIDs
- if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), maplamps); err == nil {
- topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP)
- if err := GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce); err != nil {
- logrus.Errorf("UpdateDID:发布消息错误:%s", err.Error())
- }
- }
- }
- }
- //更新长和zigbee灯控物模型
- if o.tid > 0 {
- if err := models.UpdateTID(o.did, int(o.tid)); err != nil {
- logrus.Errorf("集控器[%s]更新灯控物模型失败:%s", o.did, err.Error())
- }
- }
- }
- func (o *ChZigbeeConcentrator) handleStateChange(t time.Time) {
- //最新状态
- state := uint8(0)
- if o.errCnt >= 10 {
- state = 1
- } else if o.errCnt == 0 {
- state = 0
- } else {
- return
- }
- //状态处理
- if o.lastStateTime.IsZero() || o.state == 0xff {
- t0, s0, err := getState(o.did)
- if err != nil {
- o.state = state
- o.lastStateTime = t
- return
- }
- o.state = s0
- o.lastStateTime = t0
- }
- if o.state == 0 && state == 1 { //在线->离线
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
- } else if o.state == 1 && state == 0 { //离线->在线
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
- }
- o.state = state
- o.lastStateTime = t
- }
- func (o *ChZigbeeConcentrator) UpdateState() {
- if o.lastStateTime.IsZero() {
- t0, s0, err := getState(o.did)
- if err == nil {
- o.state = s0
- o.lastStateTime = t0
- }
- }
- if o.state == protocol.FAILED ||
- (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) {
- return
- }
- //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
- if o.state == protocol.SUCCESS {
- o.state = protocol.FAILED
- o.lastStateTime = util.MlNow()
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime})
- cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state)
- }
- }
- func (o *ChZigbeeConcentrator) UpdateLampControllerState() {
- for _, v := range o.mapLamp {
- v.UpdateState()
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeData(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_UploadData
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- o.tid = obj.Data.TID //更新物模型TID
- o.gid = obj.Gid
- //处理集控器定时上报的单灯数据
- var errCnt_ uint = 0
- for k, v := range obj.Data.Data {
- //未找到的等待下次更新灯控信息
- if ld, ok := o.mapLamp[k]; ok {
- ld.HandleData(o.tenant, obj.Gid, o.did, o.tid, t, v)
- }
- if v.State == protocol.FAILED {
- errCnt_++
- }
- }
- if errCnt_ == uint(len(obj.Data.Data)) {
- o.errCnt++
- } else {
- o.errCnt = 0
- }
- //先处理状态变化,再存入最新状态
- o.handleStateChange(t)
- cacheState(o.did, obj.Time, o.state)
- o.lastDataTime = t
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeAlarm(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_LampAlarm
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- if ld, ok := o.mapLamp[obj.Data.DID]; ok {
- ld.HandleAlarm(obj.Data)
- } else {
- logrus.Errorf("未找到灯控:%s", obj.Data.DID)
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeQLamp(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_EmptyObject
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- arr, err := models.GetLampControllerByConcentrator(obj.Id)
- if err != nil || len(arr) == 0 {
- return
- }
- lamps := make(map[uint32]string, len(arr))
- for _, v := range arr {
- if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用
- lamps[uint32(v.Number)] = v.ID
- }
- //顺带更新
- if ld, ok := o.mapLamp[v.ID]; ok {
- ld.SetDID(v.ID, uint32(v.Number))
- } else {
- var ld ChZigbeeLampController
- ld.SetDID(v.ID, uint32(v.Number))
- o.mapLamp[v.ID] = &ld
- }
- }
- var ret protocol.Pack_CHZB_LampIDs
- if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), lamps); err == nil {
- topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP)
- if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil {
- logrus.Errorf("handle_TP_CHZB_Q_LAMP:发布消息错误:%s", err.Error())
- }
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetWaittimeAck(m *mqtt.Message) {
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: uint(obj.Data.State),
- Resp: obj.Data.Error,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetSwitchAck(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_SeqLampAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: string(resq),
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetRecoveryAutoAck(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_SeqLampAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: string(resq),
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetOnofftimeAck(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_SeqLampAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: string(resq),
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryOnofftimeAck(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_QueryOnOffTimeAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: obj.Data.Error,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetUpdateLampAck(m *mqtt.Message) {
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: obj.Data.Error,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryTimeAck(m *mqtt.Message) {
- var obj protocol.Pack_CHZB_QueryTimeAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: obj.Data.Error,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- if obj.Data.LampTime != nil {
- logrus.Errorf("集控器[编码为:%s]的灯控[编号为:%d]当前时间为:%02d:%02d:%02d",
- o.did, obj.Data.LampID, obj.Data.LampTime.Hour, obj.Data.LampTime.Minite, obj.Data.LampTime.Second)
- }
- }
- func (o *ChZigbeeConcentrator) handleTpChZigbeeSetBroadcastTimeAck(m *mqtt.Message) {
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- oo := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: obj.Data.Error,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
|