||
- package main
- import (
- "context"
- "runtime/debug"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- //TODO 每次调光记录入库确认,灯的实际功率显示
- // HlZigbeeConcentrator TODO 开灯自动要关灯
- // 集控器类定义
- type HlZigbeeConcentrator struct {
- ctx context.Context
- cancel context.CancelFunc
- lock sync.Mutex
- tenant string //租户
- gid string //基础数据,网关ID
- did string //基础数据,集控器设备编码
- tid uint16 //基础数据,集控器物模型ID
- mapLamp map[string]*HlZigbeeLampController //单灯数据,编号唯一
- state uint8 //实时数据,0在线,1离线
- lastStateTime time.Time //实时数据,最新状态时间
- lastDataTime time.Time //最新数据时间
- nextHourTime time.Time //实时数据
- queue *util.MlQueue //数据队列
- errCnt uint //错误计数
- mapTopicHandle map[string]func(m *mqtt.Message)
- }
- // NewHlZigbeeConcentrator 新建集控器,并指定topic对应事件
- func NewHlZigbeeConcentrator(tenant, did string) *HlZigbeeConcentrator {
- ctx, cancel := context.WithCancel(context.Background())
- hlc := HlZigbeeConcentrator{
- ctx: ctx,
- cancel: cancel,
- tenant: tenant,
- did: did,
- mapLamp: make(map[string]*HlZigbeeLampController),
- queue: util.NewQueue(1000),
- mapTopicHandle: make(map[string]func(m *mqtt.Message)),
- }
- //改成只有一个
- hlc.mapTopicHandle[GetHLTopicUp()] = hlc.handleTopicUp
- return &hlc
- }
- func (o *HlZigbeeConcentrator) PutMessage(m *mqtt.Message) {
- o.queue.Put(m)
- }
- func (o *HlZigbeeConcentrator) Start() {
- go o.HandleQueue()
- }
- func (o *HlZigbeeConcentrator) Stop() {
- o.cancel()
- }
- // HandleQueue 启用队列循环,不同的主题调用不同的fn
- func (o *HlZigbeeConcentrator) HandleQueue() {
- defer func() {
- if err := recover(); err != nil {
- logrus.Error("HlZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack()))
- go o.HandleQueue()
- }
- }()
- o.UpdateDID() //启动时更新本地灯控数组一次
- o.GetOnlineMsg()
- var obj protocol.HLWLZB_Frequency_Down
- if str, err := obj.EnCode(o.did, 10); err == nil { //启动时下发到设备的"上发频率"
- topic := GetHLTopicDown(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME)
- err = GetHlMqttMgr().Publish(topic, str, mqtt.AtLeastOnce)
- if err != nil {
- logrus.Error("HlZigbeeConcentrator.HandleQueue:主题发送失败:", topic, str)
- }
- }
- var exit = false
- timer := time.NewTicker(30 * time.Minute)
- timer5 := time.NewTicker(5 * time.Minute)
- //每天15点半同步日出日落时间
- var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute)
- for {
- select { //if
- case <-o.ctx.Done():
- logrus.Error("HlZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err())
- exit = true
- case <-timer.C: //每隔30分钟执行一次
- o.UpdateState() //更新集控器状态,防止无数据状态不更新
- o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新
- case <-timer5.C: //每隔5分钟执行一次
- o.UpdateDID() //更新灯控设备编码 更新本地灯控数组
- //同步日出日落时间
- if util.MlNow().After(SyncSunset) {
- if err := o.SyncSunset(); err == nil {
- SyncSunset = SyncSunset.AddDate(0, 0, 1)
- }
- }
- default:
- //从队列中获取指令执行fn
- if m, ok, quantity := o.queue.Get(); ok {
- if mm, ok := m.(*mqtt.Message); ok {
- if f, ok := o.mapTopicHandle[mm.Topic()]; ok {
- f(mm)
- } else {
- logrus.Error("HlZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic())
- }
- }
- } else if quantity == 0 {
- if exit {
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- }
- }
- // SyncSunset 日出日落 平台->终端
- // TODO
- func (o *HlZigbeeConcentrator) SyncSunset() error {
- arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did)
- if err != nil {
- logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error())
- return err
- }
- if len(arr) == 0 {
- return nil
- }
- //Strategy相同的一起发送,不同的分批发送
- mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
- mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码
- for _, v := range arr {
- mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number))
- if _, ok := mapTime[v.Strategy]; ok {
- //已计算日出日落时间的,不再重复计算
- continue
- }
- var oot protocol.CHZB_OnOffTime
- var ls []models.LampStrategy
- if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
- oot.Brightness = uint8(ls[0].Brightness)
- }
- //计算时间
- if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
- onhour, _ := strconv.Atoi(strings.Split(set, ":")[0])
- onminite, _ := strconv.Atoi(strings.Split(set, ":")[1])
- offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
- offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1])
- oot.OnHour = uint8(onhour)
- oot.OnMinite = uint8(onminite)
- oot.OffHour = uint8(offhour)
- oot.OffMinite = uint8(offminite)
- }
- mapTime[v.Strategy] = &oot
- }
- for k, v := range mapNumber {
- if oot, ok := mapTime[k]; ok {
- var obj protocol.Pack_SetOnOffTime
- seq := GetNextSeq()
- if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil {
- topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME) //设置开关灯时间段,平台->终端
- err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
- if err != nil {
- logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error())
- }
- var msg string
- if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
- msg = string(msg0)
- } else {
- msg = str
- }
- odb := models.DeviceCmdRecord{
- ID: seq,
- GID: o.gid,
- DID: o.did,
- Topic: topic,
- Message: msg,
- State: 0,
- }
- if err := models.G_db.Create(&odb).Error; err != nil {
- logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error())
- } else {
- logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v)
- }
- }
- }
- }
- return nil
- }
- // UpdateDID 更新灯控的集控,更新集控的网关id
- func (o *HlZigbeeConcentrator) UpdateDID() {
- if arr, err := models.GetLampHlControllerByConcentrator(o.did); err == nil { //对集控did下所有灯控
- for _, v := range arr { //更新灯控设备编码
- if ld, ok := o.mapLamp[v.ID]; ok {
- ld.SetDID(v.Concentrator, v.ID) //集id,灯id
- if o.state == protocol.FAILED { //离线后下属灯都设置离线
- ld.SetOnOffLine(o.state)
- }
- } else {
- var ld HlZigbeeLampController //第一次则加入
- ld.SetDID(v.Concentrator, v.ID)
- o.mapLamp[v.ID] = &ld //更新本地单灯数组,需要用到
- }
- // 边缘(设备) // 设备后台 (网关后台、与java对接)
- o.gid = v.GID //设置集控器网关id
- }
- }
- }
- // 事件--改变状态
- func (o *HlZigbeeConcentrator) handleStateChange(t time.Time) {
- //最新状态
- state := uint8(0)
- if o.errCnt >= 10 {
- state = 1
- } else if o.errCnt == 0 {
- state = 0
- } else {
- return
- }
- //状态处理
- if o.lastStateTime.IsZero() || o.state == 0xff {
- t0, s0, err := getState(o.did) //取redis
- if err != nil {
- o.state = state
- o.lastStateTime = t
- return
- }
- o.state = s0
- o.lastStateTime = t0
- }
- if o.state == 0 && state == 1 { //在线->离线
- o.GetWillMsg()
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
- } else if o.state == 1 && state == 0 { //离线->在线
- o.GetOnlineMsg()
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
- }
- o.state = state
- o.lastStateTime = t
- }
- // UpdateState 每隔一分钟,集控置为离线
- func (o *HlZigbeeConcentrator) UpdateState() {
- if o.lastStateTime.IsZero() {
- t0, s0, err := getState(o.did)
- if err == nil {
- o.state = s0
- o.lastStateTime = t0
- }
- }
- if o.state == protocol.FAILED || //若之前是离线状态的,则不修改状态
- (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) {
- return
- }
- //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
- if o.state == protocol.SUCCESS {
- o.state = protocol.FAILED
- o.lastStateTime = util.MlNow()
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime})
- cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state) //改变redis
- o.GetWillMsg()
- }
- }
- // UpdateLampControllerState 每隔一分钟,灯控置为离线
- func (o *HlZigbeeConcentrator) UpdateLampControllerState() {
- for _, v := range o.mapLamp {
- v.UpdateState()
- }
- }
- // {"masterSn":"301056","msgType":"offline","snGroup":[{"serialNumber":"301056"}],
- // "ts":"1653445142","uuid":"40a47d85-2ebb-41ab-a6d9-6aad4b934d02"} 只有集控离线事件
- func (o *HlZigbeeConcentrator) onofflineProcess(s string) {
- var obj protocol.HLWLZB_OnOfflineStatus_Up
- if err := obj.DeCode(s); err != nil {
- logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s解析错误:%s", s, err.Error())
- return
- }
- i64, err1 := strconv.ParseInt(obj.Ts, 10, 62)
- t, err2 := util.MlParseTime(util.Unix2Time(i64))
- if err1 != nil || err2 != nil {
- logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s时间解析错误:%v %v", obj.Ts, err1, err2)
- return
- }
- if obj.MasterSn == o.did {
- if obj.MsgType == protocol.MT_ONLINE { //上线
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
- o.state = 0
- o.lastStateTime = t
- o.errCnt = 0
- o.GetOnlineMsg()
- } else if obj.MsgType == protocol.MT_OFFLINE { //离线
- GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
- o.state = 1
- o.lastStateTime = t
- o.errCnt = 10
- o.GetWillMsg()
- }
- }
- }
- // 存灯数据
- // {"electricity":"0.0","energy":"0.64","lampStatus":"off","lux":"0.0","power":"0.0",
- // "powerFactor":"0.0","serialNumber":"301057","voltage":"226.0"}
- func (o *HlZigbeeConcentrator) reportProcess(s string) {
- var obj protocol.HLWLZB_QueryLampStatus_Back
- if err := obj.DeCode(s); err != nil {
- logrus.Errorf("HLWLZB_QueryLampStatus_Back====%s解析错误:%s", s, err.Error())
- return
- }
- i64, err := strconv.ParseInt(obj.Ts, 10, 62)
- t, err := util.MlParseTime(util.Unix2Time(i64))
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Ts, err.Error())
- return
- }
- //o.tid = obj.Data.TID //更新物模型TID
- //o.gid = obj.Gid 要通过数据库查询
- //处理集控器定时上报的单灯数据
- var errCnt_ uint = 0
- for _, v := range obj.SnGroup { //dev_data_XXX 更新数据 9个参数入库
- //未找到的等待下次更新灯控信息
- if ld, ok := o.mapLamp[v.SerialNumber]; ok { //对应单灯
- var cl protocol.CHZB_LampData
- var err2, err3, err4, err5, err6, err7 error
- cl.Data = make(map[uint16]float64)
- if v.LampStatus == "on" {
- cl.Data[1] = float64(1)
- } else {
- cl.Data[1] = float64(0)
- }
- cl.Data[0] = float64(0)
- cl.Data[3] = float64(0)
- cl.Data[2], err2 = strconv.ParseFloat(v.Electricity, 64)
- cl.Data[4], err4 = strconv.ParseFloat(v.Lux, 64)
- cl.Data[5], err5 = strconv.ParseFloat(v.Power, 64)
- cl.Data[6], err6 = strconv.ParseFloat(v.PowerFactor, 64)
- cl.Data[7], err7 = strconv.ParseFloat(v.Voltage, 64)
- cl.Data[8], err3 = strconv.ParseFloat(v.Energy, 64)
- if err2 != nil || err3 != nil || err5 != nil || err6 != nil || err7 != nil || err4 != nil {
- str, err := json.MarshalToString(v)
- logrus.Errorf("report_process数据[%s]解析错误:%v", str, err)
- continue
- }
- ld.HandleData(o.tenant, o.gid, v.SerialNumber /*灯*/, o.tid, t, &cl)
- }
- if v.State == protocol.FAILED {
- errCnt_++
- }
- }
- if errCnt_ == uint(len(obj.SnGroup)) {
- o.errCnt++
- } else {
- o.errCnt = 0
- }
- //先处理状态变化,再存入最新状态
- o.handleStateChange(t)
- cacheState(o.did, util.Unix2Time(i64), o.state) //更新dev_stat_XXX
- o.lastDataTime = t
- }
- func (o *HlZigbeeConcentrator) turnOnOffDimmerWholeProcess(m *mqtt.Message) {
- var obj protocol.HLWLZB_Switch_Whole_Ack
- var str string
- var state uint
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
- if obj.ErrorCode == 0 {
- str = "Success"
- state = 1
- } else {
- str = ""
- state = 0
- }
- oo := models.DeviceCmdRecord{ //前插后更新
- ID: ts,
- State: state,
- Resp: str,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
- obj.MasterSn, obj.Ts, m.Topic(), err.Error())
- }
- }
- func (o *HlZigbeeConcentrator) turnOnOffDimmerMultiProcess(m *mqtt.Message) {
- var obj protocol.HLWLZB_Switch_Multi_Ack
- var str string
- var state uint
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
- if obj.ErrorCode == 0 {
- str = "Success"
- state = 1
- } else {
- str = ""
- state = 0
- }
- oo := models.DeviceCmdRecord{ //前插后更新
- ID: ts,
- State: state,
- Resp: str,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
- obj.MasterSn, obj.Ts, m.Topic(), err.Error())
- }
- }
- func (o *HlZigbeeConcentrator) strategyProcess(m *mqtt.Message) {
- var obj protocol.Pack_HLSetOnOffTime_Ack
- var str string
- var state uint
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
- if obj.ErrorCode == 0 {
- str = "Success"
- state = 1
- } else {
- str = ""
- state = 0
- }
- oo := models.DeviceCmdRecord{ //前插后更新
- ID: ts,
- State: state,
- Resp: str,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
- obj.MasterSn, obj.Ts, m.Topic(), err.Error())
- }
- }
- func (o *HlZigbeeConcentrator) clearStrategyProcess(m *mqtt.Message) {
- var obj protocol.Pack_HLClearStrategy_Ack
- var str string
- var state uint
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
- if obj.ErrorCode == 0 {
- str = "Success"
- state = 1
- } else {
- str = ""
- state = 0
- }
- oo := models.DeviceCmdRecord{ //前插后更新
- ID: ts,
- State: state,
- Resp: str,
- }
- if err := oo.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
- obj.MasterSn, obj.Ts, m.Topic(), err.Error())
- }
- }
- // fn topic处理入口
- func (o *HlZigbeeConcentrator) handleTopicUp(m *mqtt.Message) {
- var hp protocol.HLWLZB_Pack
- if err := hp.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("HlZigbeeConcentrator.handle_Topic_Up解析错误:%s,%s", m.PayloadString(), err.Error())
- }
- switch hp.MsgType {
- case protocol.MT_REPORT: //上报数据
- o.reportProcess(m.PayloadString())
- case protocol.MT_OFFLINE:
- fallthrough
- case protocol.MT_ONLINE: //上线,下线
- o.onofflineProcess(m.PayloadString())
- case protocol.MT_TURNON:
- fallthrough
- case protocol.MT_TURNOFF:
- fallthrough
- case protocol.MT_DIMMER: //开灯,关灯 dimmer ack
- if hp.ActionType == protocol.AT_WHOLE {
- o.turnOnOffDimmerWholeProcess(m)
- } else if hp.ActionType == protocol.AT_MULTI {
- o.turnOnOffDimmerMultiProcess(m)
- }
- case protocol.MT_STRATEGY: //策略 ack
- o.strategyProcess(m)
- case protocol.MT_CLEARSTRATEGY: //清除策略 ack
- o.clearStrategyProcess(m)
- case protocol.MT_ALARM: //告警
- //TODO
- default:
- }
- }
- func (o *HlZigbeeConcentrator) GetOnlineMsg() {
- //发布上线消息
- var obj protocol.Pack_IDObject
- str, err := obj.EnCode(o.gid, GetNextSeq(), 0)
- if err != nil {
- logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误1:%s", err.Error())
- }
- topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_ONLINE)
- if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil {
- logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误:%s", err.Error())
- }
- }
- func (o *HlZigbeeConcentrator) GetWillMsg() {
- payload, _ := (&protocol.Pack_IDObject{}).EnCode(o.gid, GetNextSeq(), 0) //遗嘱消息
- topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_WILL)
- if err := GetMQTTMgr().Publish(topic, payload, mqtt.AtMostOnce); err != nil {
- logrus.Errorf("HlZigbeeConcentrator.GetWillMsg:发布消息错误:%s", err.Error())
- }
- }
|