package main import ( "context" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) //TODO 每次调光记录入库确认,灯的实际功率显示 // HlZigbeeConcentrator TODO 开灯自动要关灯 // 集控器类定义 type HlZigbeeConcentrator struct { ctx context.Context cancel context.CancelFunc lock sync.Mutex tenant string //租户 gid string //基础数据,网关ID did string //基础数据,集控器设备编码 tid uint16 //基础数据,集控器物模型ID mapLamp map[string]*HlZigbeeLampController //单灯数据,编号唯一 state uint8 //实时数据,0在线,1离线 lastStateTime time.Time //实时数据,最新状态时间 lastDataTime time.Time //最新数据时间 nextHourTime time.Time //实时数据 queue *util.MlQueue //数据队列 errCnt uint //错误计数 mapTopicHandle map[string]func(m *mqtt.Message) } // NewHlZigbeeConcentrator 新建集控器,并指定topic对应事件 func NewHlZigbeeConcentrator(tenant, did string) *HlZigbeeConcentrator { ctx, cancel := context.WithCancel(context.Background()) hlc := HlZigbeeConcentrator{ ctx: ctx, cancel: cancel, tenant: tenant, did: did, mapLamp: make(map[string]*HlZigbeeLampController), queue: util.NewQueue(1000), mapTopicHandle: make(map[string]func(m *mqtt.Message)), } //改成只有一个 hlc.mapTopicHandle[GetHLTopicUp()] = hlc.handleTopicUp return &hlc } func (o *HlZigbeeConcentrator) PutMessage(m *mqtt.Message) { o.queue.Put(m) } func (o *HlZigbeeConcentrator) Start() { go o.HandleQueue() } func (o *HlZigbeeConcentrator) Stop() { o.cancel() } // HandleQueue 启用队列循环,不同的主题调用不同的fn func (o *HlZigbeeConcentrator) HandleQueue() { defer func() { if err := recover(); err != nil { logrus.Error("HlZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack())) go o.HandleQueue() } }() o.UpdateDID() //启动时更新本地灯控数组一次 o.GetOnlineMsg() var obj protocol.HLWLZB_Frequency_Down if str, err := obj.EnCode(o.did, 10); err == nil { //启动时下发到设备的"上发频率" topic := GetHLTopicDown(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME) err = GetHlMqttMgr().Publish(topic, str, mqtt.AtLeastOnce) if err != nil { logrus.Error("HlZigbeeConcentrator.HandleQueue:主题发送失败:", topic, str) } } var exit = false timer := time.NewTicker(30 * time.Minute) timer5 := time.NewTicker(5 * time.Minute) //每天15点半同步日出日落时间 var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute) for { select { //if case <-o.ctx.Done(): logrus.Error("HlZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err()) exit = true case <-timer.C: //每隔30分钟执行一次 o.UpdateState() //更新集控器状态,防止无数据状态不更新 o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新 case <-timer5.C: //每隔5分钟执行一次 o.UpdateDID() //更新灯控设备编码 更新本地灯控数组 //同步日出日落时间 if util.MlNow().After(SyncSunset) { if err := o.SyncSunset(); err == nil { SyncSunset = SyncSunset.AddDate(0, 0, 1) } } default: //从队列中获取指令执行fn if m, ok, quantity := o.queue.Get(); ok { if mm, ok := m.(*mqtt.Message); ok { if f, ok := o.mapTopicHandle[mm.Topic()]; ok { f(mm) } else { logrus.Error("HlZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic()) } } } else if quantity == 0 { if exit { return } time.Sleep(100 * time.Millisecond) } } } } // SyncSunset 日出日落 平台->终端 // TODO func (o *HlZigbeeConcentrator) SyncSunset() error { arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did) if err != nil { logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error()) return err } if len(arr) == 0 { return nil } //Strategy相同的一起发送,不同的分批发送 mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间 mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码 for _, v := range arr { mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number)) if _, ok := mapTime[v.Strategy]; ok { //已计算日出日落时间的,不再重复计算 continue } var oot protocol.CHZB_OnOffTime var ls []models.LampStrategy if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 { oot.Brightness = uint8(ls[0].Brightness) } //计算时间 if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil { onhour, _ := strconv.Atoi(strings.Split(set, ":")[0]) onminite, _ := strconv.Atoi(strings.Split(set, ":")[1]) offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0]) offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1]) oot.OnHour = uint8(onhour) oot.OnMinite = uint8(onminite) oot.OffHour = uint8(offhour) oot.OffMinite = uint8(offminite) } mapTime[v.Strategy] = &oot } for k, v := range mapNumber { if oot, ok := mapTime[k]; ok { var obj protocol.Pack_SetOnOffTime seq := GetNextSeq() if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil { topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME) //设置开关灯时间段,平台->终端 err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce) if err != nil { logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error()) } var msg string if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil { msg = string(msg0) } else { msg = str } odb := models.DeviceCmdRecord{ ID: seq, GID: o.gid, DID: o.did, Topic: topic, Message: msg, State: 0, } if err := models.G_db.Create(&odb).Error; err != nil { logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error()) } else { logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v) } } } } return nil } // UpdateDID 更新灯控的集控,更新集控的网关id func (o *HlZigbeeConcentrator) UpdateDID() { if arr, err := models.GetLampHlControllerByConcentrator(o.did); err == nil { //对集控did下所有灯控 for _, v := range arr { //更新灯控设备编码 if ld, ok := o.mapLamp[v.ID]; ok { ld.SetDID(v.Concentrator, v.ID) //集id,灯id if o.state == protocol.FAILED { //离线后下属灯都设置离线 ld.SetOnOffLine(o.state) } } else { var ld HlZigbeeLampController //第一次则加入 ld.SetDID(v.Concentrator, v.ID) o.mapLamp[v.ID] = &ld //更新本地单灯数组,需要用到 } // 边缘(设备) // 设备后台 (网关后台、与java对接) o.gid = v.GID //设置集控器网关id } } } // 事件--改变状态 func (o *HlZigbeeConcentrator) handleStateChange(t time.Time) { //最新状态 state := uint8(0) if o.errCnt >= 10 { state = 1 } else if o.errCnt == 0 { state = 0 } else { return } //状态处理 if o.lastStateTime.IsZero() || o.state == 0xff { t0, s0, err := getState(o.did) //取redis if err != nil { o.state = state o.lastStateTime = t return } o.state = s0 o.lastStateTime = t0 } if o.state == 0 && state == 1 { //在线->离线 o.GetWillMsg() GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t}) } else if o.state == 1 && state == 0 { //离线->在线 o.GetOnlineMsg() GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t}) } o.state = state o.lastStateTime = t } // UpdateState 每隔一分钟,集控置为离线 func (o *HlZigbeeConcentrator) UpdateState() { if o.lastStateTime.IsZero() { t0, s0, err := getState(o.did) if err == nil { o.state = s0 o.lastStateTime = t0 } } if o.state == protocol.FAILED || //若之前是离线状态的,则不修改状态 (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) { return } //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态 if o.state == protocol.SUCCESS { o.state = protocol.FAILED o.lastStateTime = util.MlNow() GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime}) cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state) //改变redis o.GetWillMsg() } } // UpdateLampControllerState 每隔一分钟,灯控置为离线 func (o *HlZigbeeConcentrator) UpdateLampControllerState() { for _, v := range o.mapLamp { v.UpdateState() } } // {"masterSn":"301056","msgType":"offline","snGroup":[{"serialNumber":"301056"}], // "ts":"1653445142","uuid":"40a47d85-2ebb-41ab-a6d9-6aad4b934d02"} 只有集控离线事件 func (o *HlZigbeeConcentrator) onofflineProcess(s string) { var obj protocol.HLWLZB_OnOfflineStatus_Up if err := obj.DeCode(s); err != nil { logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s解析错误:%s", s, err.Error()) return } i64, err1 := strconv.ParseInt(obj.Ts, 10, 62) t, err2 := util.MlParseTime(util.Unix2Time(i64)) if err1 != nil || err2 != nil { logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s时间解析错误:%v %v", obj.Ts, err1, err2) return } if obj.MasterSn == o.did { if obj.MsgType == protocol.MT_ONLINE { //上线 GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t}) o.state = 0 o.lastStateTime = t o.errCnt = 0 o.GetOnlineMsg() } else if obj.MsgType == protocol.MT_OFFLINE { //离线 GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t}) o.state = 1 o.lastStateTime = t o.errCnt = 10 o.GetWillMsg() } } } // 存灯数据 // {"electricity":"0.0","energy":"0.64","lampStatus":"off","lux":"0.0","power":"0.0", // "powerFactor":"0.0","serialNumber":"301057","voltage":"226.0"} func (o *HlZigbeeConcentrator) reportProcess(s string) { var obj protocol.HLWLZB_QueryLampStatus_Back if err := obj.DeCode(s); err != nil { logrus.Errorf("HLWLZB_QueryLampStatus_Back====%s解析错误:%s", s, err.Error()) return } i64, err := strconv.ParseInt(obj.Ts, 10, 62) t, err := util.MlParseTime(util.Unix2Time(i64)) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Ts, err.Error()) return } //o.tid = obj.Data.TID //更新物模型TID //o.gid = obj.Gid 要通过数据库查询 //处理集控器定时上报的单灯数据 var errCnt_ uint = 0 for _, v := range obj.SnGroup { //dev_data_XXX 更新数据 9个参数入库 //未找到的等待下次更新灯控信息 if ld, ok := o.mapLamp[v.SerialNumber]; ok { //对应单灯 var cl protocol.CHZB_LampData var err2, err3, err4, err5, err6, err7 error cl.Data = make(map[uint16]float64) if v.LampStatus == "on" { cl.Data[1] = float64(1) } else { cl.Data[1] = float64(0) } cl.Data[0] = float64(0) cl.Data[3] = float64(0) cl.Data[2], err2 = strconv.ParseFloat(v.Electricity, 64) cl.Data[4], err4 = strconv.ParseFloat(v.Lux, 64) cl.Data[5], err5 = strconv.ParseFloat(v.Power, 64) cl.Data[6], err6 = strconv.ParseFloat(v.PowerFactor, 64) cl.Data[7], err7 = strconv.ParseFloat(v.Voltage, 64) cl.Data[8], err3 = strconv.ParseFloat(v.Energy, 64) if err2 != nil || err3 != nil || err5 != nil || err6 != nil || err7 != nil || err4 != nil { str, err := json.MarshalToString(v) logrus.Errorf("report_process数据[%s]解析错误:%v", str, err) continue } ld.HandleData(o.tenant, o.gid, v.SerialNumber /*灯*/, o.tid, t, &cl) } if v.State == protocol.FAILED { errCnt_++ } } if errCnt_ == uint(len(obj.SnGroup)) { o.errCnt++ } else { o.errCnt = 0 } //先处理状态变化,再存入最新状态 o.handleStateChange(t) cacheState(o.did, util.Unix2Time(i64), o.state) //更新dev_stat_XXX o.lastDataTime = t } func (o *HlZigbeeConcentrator) turnOnOffDimmerWholeProcess(m *mqtt.Message) { var obj protocol.HLWLZB_Switch_Whole_Ack var str string var state uint if err := obj.DeCode(m.PayloadString()); err != nil { return } ts, _ := strconv.ParseUint(obj.Ts, 10, 64) if obj.ErrorCode == 0 { str = "Success" state = 1 } else { str = "" state = 0 } oo := models.DeviceCmdRecord{ //前插后更新 ID: ts, State: state, Resp: str, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]", obj.MasterSn, obj.Ts, m.Topic(), err.Error()) } } func (o *HlZigbeeConcentrator) turnOnOffDimmerMultiProcess(m *mqtt.Message) { var obj protocol.HLWLZB_Switch_Multi_Ack var str string var state uint if err := obj.DeCode(m.PayloadString()); err != nil { return } ts, _ := strconv.ParseUint(obj.Ts, 10, 64) if obj.ErrorCode == 0 { str = "Success" state = 1 } else { str = "" state = 0 } oo := models.DeviceCmdRecord{ //前插后更新 ID: ts, State: state, Resp: str, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]", obj.MasterSn, obj.Ts, m.Topic(), err.Error()) } } func (o *HlZigbeeConcentrator) strategyProcess(m *mqtt.Message) { var obj protocol.Pack_HLSetOnOffTime_Ack var str string var state uint if err := obj.DeCode(m.PayloadString()); err != nil { return } ts, _ := strconv.ParseUint(obj.Ts, 10, 64) if obj.ErrorCode == 0 { str = "Success" state = 1 } else { str = "" state = 0 } oo := models.DeviceCmdRecord{ //前插后更新 ID: ts, State: state, Resp: str, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]", obj.MasterSn, obj.Ts, m.Topic(), err.Error()) } } func (o *HlZigbeeConcentrator) clearStrategyProcess(m *mqtt.Message) { var obj protocol.Pack_HLClearStrategy_Ack var str string var state uint if err := obj.DeCode(m.PayloadString()); err != nil { return } ts, _ := strconv.ParseUint(obj.Ts, 10, 64) if obj.ErrorCode == 0 { str = "Success" state = 1 } else { str = "" state = 0 } oo := models.DeviceCmdRecord{ //前插后更新 ID: ts, State: state, Resp: str, } if err := oo.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]", obj.MasterSn, obj.Ts, m.Topic(), err.Error()) } } // fn topic处理入口 func (o *HlZigbeeConcentrator) handleTopicUp(m *mqtt.Message) { var hp protocol.HLWLZB_Pack if err := hp.DeCode(m.PayloadString()); err != nil { logrus.Errorf("HlZigbeeConcentrator.handle_Topic_Up解析错误:%s,%s", m.PayloadString(), err.Error()) } switch hp.MsgType { case protocol.MT_REPORT: //上报数据 o.reportProcess(m.PayloadString()) case protocol.MT_OFFLINE: fallthrough case protocol.MT_ONLINE: //上线,下线 o.onofflineProcess(m.PayloadString()) case protocol.MT_TURNON: fallthrough case protocol.MT_TURNOFF: fallthrough case protocol.MT_DIMMER: //开灯,关灯 dimmer ack if hp.ActionType == protocol.AT_WHOLE { o.turnOnOffDimmerWholeProcess(m) } else if hp.ActionType == protocol.AT_MULTI { o.turnOnOffDimmerMultiProcess(m) } case protocol.MT_STRATEGY: //策略 ack o.strategyProcess(m) case protocol.MT_CLEARSTRATEGY: //清除策略 ack o.clearStrategyProcess(m) case protocol.MT_ALARM: //告警 //TODO default: } } func (o *HlZigbeeConcentrator) GetOnlineMsg() { //发布上线消息 var obj protocol.Pack_IDObject str, err := obj.EnCode(o.gid, GetNextSeq(), 0) if err != nil { logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误1:%s", err.Error()) } topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_ONLINE) if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil { logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误:%s", err.Error()) } } func (o *HlZigbeeConcentrator) GetWillMsg() { payload, _ := (&protocol.Pack_IDObject{}).EnCode(o.gid, GetNextSeq(), 0) //遗嘱消息 topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_WILL) if err := GetMQTTMgr().Publish(topic, payload, mqtt.AtMostOnce); err != nil { logrus.Errorf("HlZigbeeConcentrator.GetWillMsg:发布消息错误:%s", err.Error()) } }