package main import ( "context" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) // 长河Zigbee 集控器 type ChZigbeeConcentrator struct { ctx context.Context cancel context.CancelFunc lock sync.Mutex tenant string //租户 gid string //基础数据,网关ID did string //基础数据,集控器设备编码 tid uint16 //基础数据,集控器物模型ID mapLamp map[string]*ChZigbeeLampController //单灯数据,编号唯一 state uint8 //实时数据,0在线,1离线 lastStateTime time.Time //实时数据,最新状态时间 lastDataTime time.Time //最新数据时间 nextHourTime time.Time //实时数据 queue *util.MlQueue //数据队列 errCnt uint //错误计数 mapTopicHandle map[string]func(m *mqtt.Message) } func NewChZigbeeConcentrator(tenant, did string) *ChZigbeeConcentrator { ctx, cancel := context.WithCancel(context.Background()) chc := ChZigbeeConcentrator{ ctx: ctx, cancel: cancel, tenant: tenant, did: did, mapLamp: make(map[string]*ChZigbeeLampController), queue: util.NewQueue(1000), mapTopicHandle: make(map[string]func(m *mqtt.Message)), } chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_DATA)] = chc.handleTpChZigbeeData chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_LAMP)] = chc.handleTpChZigbeeQLamp chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_WAITTIME_ACK)] = chc.handleTpChZigbeeSetWaittimeAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_SWITCH_ACK)] = chc.handleTpChZigbeeSetSwitchAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK)] = chc.handleTpChZigbeeSetRecoveryAutoAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_ONOFFTIME_ACK)] = chc.handleTpChZigbeeSetOnofftimeAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK)] = chc.handleTpChZigbeeQueryOnofftimeAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK)] = chc.handleTpChZigbeeSetUpdateLampAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_TIME_ACK)] = chc.handleTpChZigbeeQueryTimeAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_BROADCASTTIME_ACK)] = chc.handleTpChZigbeeSetBroadcastTimeAck chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_ALARM)] = chc.handleTpChZigbeeAlarm return &chc } func (o *ChZigbeeConcentrator) PutMessage(m *mqtt.Message) { o.queue.Put(m) } func (o *ChZigbeeConcentrator) Start() { go o.HandleQueue() } func (o *ChZigbeeConcentrator) Stop() { o.cancel() } func (o *ChZigbeeConcentrator) HandleQueue() { defer func() { if err := recover(); err != nil { logrus.Error("ChZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack())) go o.HandleQueue() } }() o.UpdateDID() var exit = false timer := time.NewTicker(1 * time.Minute) timer5 := time.NewTicker(5 * time.Minute) //每天15点半同步日出日落时间 var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute) for { select { case <-o.ctx.Done(): logrus.Error("ChZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err()) exit = true case <-timer.C: //每隔1分钟执行一次 o.UpdateState() //更新集控器状态,防止无数据状态不更新 o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新 case <-timer5.C: //每隔5分钟执行一次 o.UpdateDID() //更新灯控设备编码 //同步日出日落时间 if util.MlNow().After(SyncSunset) { if err := o.SyncSunset(); err == nil { SyncSunset = SyncSunset.AddDate(0, 0, 1) } } default: //从队列钟获取指令执行 if m, ok, quantity := o.queue.Get(); ok { if mm, ok := m.(*mqtt.Message); ok { if f, ok := o.mapTopicHandle[mm.Topic()]; ok { f(mm) } else { logrus.Error("ChZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic()) } } } else if quantity == 0 { if exit { return } time.Sleep(100 * time.Millisecond) } } } } func (o *ChZigbeeConcentrator) SyncSunset() error { arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did) if err != nil { logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error()) return err } if len(arr) == 0 { return nil } //Strategy相同的一起发送,不同的分批发送 mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间 mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码 for _, v := range arr { mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number)) if _, ok := mapTime[v.Strategy]; ok { //已计算日出日落时间的,不再重复计算 continue } var oot protocol.CHZB_OnOffTime var ls []models.LampStrategy if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 { oot.Brightness = uint8(ls[0].Brightness) } //计算时间 if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil { onhour, _ := strconv.Atoi(strings.Split(set, ":")[0]) onminite, _ := strconv.Atoi(strings.Split(set, ":")[1]) offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0]) offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1]) oot.OnHour = uint8(onhour) oot.OnMinite = uint8(onminite) oot.OffHour = uint8(offhour) oot.OffMinite = uint8(offminite) } mapTime[v.Strategy] = &oot } for k, v := range mapNumber { if oot, ok := mapTime[k]; ok { var obj protocol.Pack_SetOnOffTime seq := GetNextSeq() if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil { topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME) err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce) if err != nil { logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error()) } var msg string if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil { msg = string(msg0) } else { msg = str } odb := models.DeviceCmdRecord{ ID: seq, GID: o.gid, DID: o.did, Topic: topic, Message: msg, State: 0, } if err := models.G_db.Create(&odb).Error; err != nil { logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error()) } else { logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v) } } } } return nil } func (o *ChZigbeeConcentrator) UpdateDID() { if arr, err := models.GetLampControllerByConcentrator(o.did); err == nil { maplamps := make(map[uint32]string, len(arr)) for _, v := range arr { if ld, ok := o.mapLamp[v.ID]; ok { ld.SetDID(v.ID, uint32(v.Number)) } else { var ld ChZigbeeLampController ld.SetDID(v.ID, uint32(v.Number)) o.mapLamp[v.ID] = &ld } if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用 maplamps[uint32(v.Number)] = v.ID } o.gid = v.GID } if len(maplamps) > 0 { var ret protocol.Pack_CHZB_LampIDs if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), maplamps); err == nil { topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP) if err := GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce); err != nil { logrus.Errorf("UpdateDID:发布消息错误:%s", err.Error()) } } } } //更新长和zigbee灯控物模型 if o.tid > 0 { if err := models.UpdateTID(o.did, int(o.tid)); err != nil { logrus.Errorf("集控器[%s]更新灯控物模型失败:%s", o.did, err.Error()) } } } func (o *ChZigbeeConcentrator) handleStateChange(t time.Time) { //最新状态 state := uint8(0) if o.errCnt >= 10 { state = 1 } else if o.errCnt == 0 { state = 0 } else { return } //状态处理 if o.lastStateTime.IsZero() || o.state == 0xff { t0, s0, err := getState(o.did) if err != nil { o.state = state o.lastStateTime = t return } o.state = s0 o.lastStateTime = t0 } if o.state == 0 && state == 1 { //在线->离线 GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t}) } else if o.state == 1 && state == 0 { //离线->在线 GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t}) } o.state = state o.lastStateTime = t } func (o *ChZigbeeConcentrator) UpdateState() { if o.lastStateTime.IsZero() { t0, s0, err := getState(o.did) if err == nil { o.state = s0 o.lastStateTime = t0 } } if o.state == protocol.FAILED || (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) { return } //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态 if o.state == protocol.SUCCESS { o.state = protocol.FAILED o.lastStateTime = util.MlNow() GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime}) cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state) } } func (o *ChZigbeeConcentrator) UpdateLampControllerState() { for _, v := range o.mapLamp { v.UpdateState() } } func (o *ChZigbeeConcentrator) handleTpChZigbeeData(m *mqtt.Message) { var obj protocol.Pack_CHZB_UploadData if err := obj.DeCode(m.PayloadString()); err != nil { return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } o.tid = obj.Data.TID //更新物模型TID o.gid = obj.Gid //处理集控器定时上报的单灯数据 var errCnt_ uint = 0 for k, v := range obj.Data.Data { //未找到的等待下次更新灯控信息 if ld, ok := o.mapLamp[k]; ok { ld.HandleData(o.tenant, obj.Gid, o.did, o.tid, t, v) } if v.State == protocol.FAILED { errCnt_++ } } if errCnt_ == uint(len(obj.Data.Data)) { o.errCnt++ } else { o.errCnt = 0 } //先处理状态变化,再存入最新状态 o.handleStateChange(t) cacheState(o.did, obj.Time, o.state) o.lastDataTime = t } func (o *ChZigbeeConcentrator) handleTpChZigbeeAlarm(m *mqtt.Message) { var obj protocol.Pack_CHZB_LampAlarm if err := obj.DeCode(m.PayloadString()); err != nil { return } if ld, ok := o.mapLamp[obj.Data.DID]; ok { ld.HandleAlarm(obj.Data) } else { logrus.Errorf("未找到灯控:%s", obj.Data.DID) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeQLamp(m *mqtt.Message) { var obj protocol.Pack_CHZB_EmptyObject if err := obj.DeCode(m.PayloadString()); err != nil { return } arr, err := models.GetLampControllerByConcentrator(obj.Id) if err != nil || len(arr) == 0 { return } lamps := make(map[uint32]string, len(arr)) for _, v := range arr { if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用 lamps[uint32(v.Number)] = v.ID } //顺带更新 if ld, ok := o.mapLamp[v.ID]; ok { ld.SetDID(v.ID, uint32(v.Number)) } else { var ld ChZigbeeLampController ld.SetDID(v.ID, uint32(v.Number)) o.mapLamp[v.ID] = &ld } } var ret protocol.Pack_CHZB_LampIDs if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), lamps); err == nil { topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP) if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil { logrus.Errorf("handle_TP_CHZB_Q_LAMP:发布消息错误:%s", err.Error()) } } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetWaittimeAck(m *mqtt.Message) { var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ ID: obj.Seq, State: uint(obj.Data.State), Resp: obj.Data.Error, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetSwitchAck(m *mqtt.Message) { var obj protocol.Pack_CHZB_SeqLampAck if err := obj.DeCode(m.PayloadString()); err != nil { return } resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ") oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: string(resq), } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetRecoveryAutoAck(m *mqtt.Message) { var obj protocol.Pack_CHZB_SeqLampAck if err := obj.DeCode(m.PayloadString()); err != nil { return } resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ") oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: string(resq), } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetOnofftimeAck(m *mqtt.Message) { var obj protocol.Pack_CHZB_SeqLampAck if err := obj.DeCode(m.PayloadString()); err != nil { return } resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ") oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: string(resq), } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryOnofftimeAck(m *mqtt.Message) { var obj protocol.Pack_CHZB_QueryOnOffTimeAck if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: obj.Data.Error, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetUpdateLampAck(m *mqtt.Message) { var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: obj.Data.Error, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryTimeAck(m *mqtt.Message) { var obj protocol.Pack_CHZB_QueryTimeAck if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: obj.Data.Error, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } if obj.Data.LampTime != nil { logrus.Errorf("集控器[编码为:%s]的灯控[编号为:%d]当前时间为:%02d:%02d:%02d", o.did, obj.Data.LampID, obj.Data.LampTime.Hour, obj.Data.LampTime.Minite, obj.Data.LampTime.Second) } } func (o *ChZigbeeConcentrator) handleTpChZigbeeSetBroadcastTimeAck(m *mqtt.Message) { var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err != nil { return } oo := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: obj.Data.Error, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } }