package main import ( "context" "encoding/binary" "errors" "strconv" "sync" "time" "github.com/go-redis/redis/v7" "github.com/valyala/bytebufferpool" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" "lc/edge/ipole/zigbee" ) var ConcentratorProtocol = "CHDDJK-Zigbee" var LampOotPrefix = "lamp_oot_" var LampSwitchPrefix = "lamp_switch_" var LampBroadcastPrefix = "lamp_broadcastauto_" var LampAlarmPrefix = "lamp_alarm_" func PoolPut(b *bytebufferpool.ByteBuffer) { if b != nil { bytebufferpool.Put(b) } } func GetPoleIDByte(PoleID uint32) []byte { tmp := make([]byte, 4) binary.BigEndian.PutUint32(tmp, PoleID) return tmp[2:4] } type LampTimeRange struct { Start util.MLTime `json:"start"` End util.MLTime `json:"end"` Brightness uint8 `json:"brightness"` //0熄灯,大于0都是开灯 } func (o *LampTimeRange) isInTimeRange(t time.Time) bool { if t.After(time.Time(o.Start)) && t.Before(time.Time(o.End)) { return true } return false } type LampAlarmInfo struct { Alarm *protocol.LampAlarm `json:"alarm"` Send bool `json:"send"` } // Concentrator 集中器管理 type Concentrator struct { seq uint8 mutexSeq sync.Mutex devInfo *protocol.DevInfo model *protocol.IotModel ctx context.Context cancel context.CancelFunc downQueue *util.MlQueue readQueue *util.MlQueue //读取数据的队列,读完则发送 mapLamps map[uint32]string mapTopicHandle map[string]func(m mqtt.Message) mapLamps2OOT map[uint32][]zigbee.OnOffTime //时控策略 mapTempLampsOOT map[uint32]*LampTimeRange //临时手动控时间段,手动控制开关灯 broadcastAutoTime time.Time //广播模式截止时间,过期自动恢复 mapLampAlarm map[string]*LampAlarmInfo //告警数据 chanDevInfo chan *protocol.DevInfo //设备管理更新 chanModelInfo chan *ModelInfo //设备管理更新 } func NewConcentrator(info *protocol.DevInfo) Device { ctx, cancel := context.WithCancel(context.Background()) dev := &Concentrator{ mapLamps: make(map[uint32]string), devInfo: info, ctx: ctx, cancel: cancel, downQueue: util.NewQueue(200), readQueue: util.NewQueue(200), mapTopicHandle: make(map[string]func(m mqtt.Message)), mapLamps2OOT: make(map[uint32][]zigbee.OnOffTime), //时控策略 mapTempLampsOOT: make(map[uint32]*LampTimeRange), //临时时间段,手动控制开关灯 broadcastAutoTime: time.Time{}, mapLampAlarm: make(map[string]*LampAlarmInfo), chanDevInfo: make(chan *protocol.DevInfo), chanModelInfo: make(chan *ModelInfo), } iot, err := loadModel(info.TID) if err == nil && iot.TID == info.TID && iot.Protocol == ConcentratorProtocol { dev.model = iot } dev.SetTopicHandle() return dev } func (o *Concentrator) SetTopicHandle() { o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH)] = o.HandleTpChzbSetSwitch o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO)] = o.HandleTpChzbSetRecoveryAuto o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME)] = o.HandleSetOnOffTime o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP)] = o.HandleSetUpdateLamp o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME)] = o.HandleQueryTime } func (o *Concentrator) MQTTSubscribe() { GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //广播校时 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_WAITTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置zigbee集中器收发等待时间 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH), mqtt.ExactlyOnce, o.HandleCache, ToAll) //开关灯,广播开关灯 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO), mqtt.ExactlyOnce, o.HandleCache, ToAll) //广播开关灯 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置开关灯时间段 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取开关灯时间段 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //更新灯控末端 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取单灯末端时间 } func (o *Concentrator) Start() { o.MQTTSubscribe() retry := 3 sleep := time.Duration(2) for i := 0; i < retry; i++ { if err := o.ReloadOOTFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } for i := 0; i < retry; i++ { if err := o.ReloadSwitchOOTFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } for i := 0; i < retry; i++ { if err := o.ReloadBroadCastFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } for i := 0; i < retry; i++ { if err := o.ReloadLampAlarmFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } go o.Handle() } func (o *Concentrator) Stop() { o.cancel() } func (o *Concentrator) UpdateInfo(devinfo protocol.DevInfo) { o.chanDevInfo <- &devinfo } func (o *Concentrator) GetDevInfo() *protocol.DevInfo { return o.devInfo } func (o *Concentrator) UpdateModel(tid uint16, flag int) { if tid > 0 { mi := ModelInfo{ TID: tid, Flag: flag, } o.chanModelInfo <- &mi } } func (o *Concentrator) UpdateModel2(mi *ModelInfo) { if o.devInfo.TID != mi.TID { return } if mi.Flag == 0 { return } iot, err := loadModel(mi.TID) if err != nil { return } if iot.Protocol == ConcentratorProtocol { //合法的物模型 o.model = iot } } func (o *Concentrator) ReloadOOTFromRedis() error { result, err := redisEdgeData.HGetAll(LampOotPrefix + o.devInfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } return err } for k, v := range result { var oot []zigbee.OnOffTime lampId, err2 := strconv.Atoi(k) err3 := json.UnmarshalFromString(v, &oot) if err2 == nil && err3 == nil { o.mapLamps2OOT[uint32(lampId)] = oot } } return nil } func (o *Concentrator) ReloadSwitchOOTFromRedis() error { result, err := redisEdgeData.HGetAll(LampSwitchPrefix + o.devInfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } return err } for k, v := range result { var ltr LampTimeRange lampId, err2 := strconv.Atoi(k) err3 := json.UnmarshalFromString(v, <r) if err2 == nil && err3 == nil { o.mapTempLampsOOT[uint32(lampId)] = <r } } return nil } func (o *Concentrator) ReloadBroadCastFromRedis() error { strTime, err := redisEdgeData.Get(LampBroadcastPrefix + o.devInfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } return err } if t, err := util.MlParseTime(strTime); err == nil { o.broadcastAutoTime = t } return nil } func (o *Concentrator) ReloadLampAlarmFromRedis() error { mapAlarm, err := redisEdgeData.HGetAll(LampAlarmPrefix + o.devInfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } return err } for k, v := range mapAlarm { var lai LampAlarmInfo if err := json.UnmarshalFromString(v, &lai); err == nil { o.mapLampAlarm[k] = &lai } } return nil } func (o *Concentrator) Handle() { defer func() { recover() go o.Handle() }() o.queryPush() o.BroadcastTime() exit := false mapData := make(map[string]*protocol.CHZB_LampData) LastTime := util.MlNow() nextFillTime := time.Time{} for { select { case <-o.ctx.Done(): exit = true case devInfo := <-o.chanDevInfo: o.devInfo = devInfo case mi := <-o.chanModelInfo: o.UpdateModel2(mi) default: //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } } } else { if exit { //退出前全部恢复时控模式 o.CheckRecoveryAuto(true) return } //每小时同步一次时间 if time.Now().Sub(LastTime).Minutes() > 60 { o.queryPush() //每小时从服务端同步一次单灯编号 o.BroadcastTime() //每小时广播一次时间同步消息 LastTime = time.Now() } o.CheckRecoveryAuto(false) quantity, send := o.NextQueueRead(mapData) if quantity == 0 || send { if nextFillTime.IsZero() || nextFillTime.Before(util.MlNow()) { o.fillReadQueue() nextFillTime = util.MlNow().Add(time.Duration(o.devInfo.SendCloud) * time.Millisecond) } if send { mapData = make(map[string]*protocol.CHZB_LampData) o.UploadLampAlarm() } time.Sleep(300 * time.Millisecond) } } } } } // CheckRecoveryAuto force=true 强制全部恢复时控模式 func (o *Concentrator) CheckRecoveryAuto(force bool) { //存在广播控时,如果广播控过期,则全部恢复时控模式,未过期,则不处理 if (!o.broadcastAutoTime.IsZero() && o.broadcastAutoTime.Before(util.MlNow())) || force { for k := range o.mapTempLampsOOT { delete(o.mapTempLampsOOT, k) } if err := o.BroadcastAuto(); err == nil { o.broadcastAutoTime = time.Time{} //删除redis中所有临时开关灯记录 redisEdgeData.Del(LampSwitchPrefix + o.devInfo.DevCode) } } else { //如果广播控模式未过期,则判断单灯是否手动控过期,过期则恢复 var strList []string for k, v := range o.mapTempLampsOOT { if time.Time(v.End).Before(util.MlNow()) { if err := o.SetPoleAuto(k); err == nil { strList = append(strList, strconv.Itoa(int(k))) delete(o.mapTempLampsOOT, k) } } } if len(strList) > 0 { _ = redisEdgeData.HDel(LampSwitchPrefix+o.devInfo.DevCode, strList...) } } } func (o *Concentrator) queryPush() { var obj protocol.Pack_CHZB_EmptyObject if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64()); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_LAMP), str, mqtt.AtMostOnce, ToCloud) } } type LampNumberDID struct { LampID uint32 DID string } func (o *Concentrator) fillReadQueue() { for k, v := range o.mapLamps { o.readQueue.Put(LampNumberDID{LampID: k, DID: v}) } } func (o *Concentrator) NextQueueRead(mapData map[string]*protocol.CHZB_LampData) (uint32, bool) { val, ok, quantity := o.readQueue.Get() if ok { lnd := val.(LampNumberDID) var err_ error var data protocol.CHZB_LampData data.Data = make(map[uint16]float64) if b1, b2, err := o.GetBrightness(lnd.LampID); err == nil { data.Data[1] = float64(b1) data.Data[2] = float64(b2) //判断灯亮是否正常 o.CheckLampAlarm(lnd, b1, b2) } else { err_ = err } if e, err := o.ReadElectricalPara(lnd.LampID); err == nil { data.Data[3] = e.Voltage[0] data.Data[4] = e.Voltage[1] data.Data[5] = e.Voltage[2] data.Data[6] = e.Current[0] data.Data[7] = e.Current[1] data.Data[8] = float64(e.Degree[0]) data.Data[9] = float64(e.Degree[1]) } else { err_ = err } data.SetStateErrorData(err_) mapData[lnd.DID] = &data } if quantity == 0 && len(mapData) > 0 { var obj protocol.Pack_CHZB_UploadData if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), o.devInfo.TID, mapData); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_DATA), str, mqtt.AtMostOnce, ToCloud) } return quantity, true } return quantity, false } // UploadLampAlarm 告警开始上报和告警结束上报 func (o *Concentrator) UploadLampAlarm() { var toDelete []string for k, v := range o.mapLampAlarm { if !v.Send && v.Alarm.EndTime == "" { //告警开始上报 var obj protocol.Pack_CHZB_LampAlarm if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud) } v.Send = true //缓存到redis strAlarm, _ := json.MarshalToString(v) mapRedis := make(map[string]interface{}) mapRedis[k] = strAlarm redisEdgeData.HSet(LampAlarmPrefix+o.devInfo.DevCode, mapRedis) } else if v.Alarm.EndTime != "" { //告警结束上报 var obj protocol.Pack_CHZB_LampAlarm if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud) } toDelete = append(toDelete, k) delete(o.mapLampAlarm, k) } } if len(toDelete) > 0 { redisEdgeData.HDel(LampAlarmPrefix+o.devInfo.DevCode, toDelete...) } } // CheckLampAlarm 检查开灯、关灯、亮度异常 func (o *Concentrator) CheckLampAlarm(lnd LampNumberDID, b1, b2 uint8) { //真实数据时间 now := util.MlNow().Add(-time.Duration(o.devInfo.WaitTime) * time.Millisecond) except := uint16(protocol.LE_UNKNOWN) //策略时间段检查 if oots, ok := o.mapLamps2OOT[lnd.LampID]; ok { for _, oot := range oots { //亮灯时间段 if oot.InTimeRange(util.MlNow()) { if b1 > 0 { //亮灯 if oot.Brightness == b1 { except = protocol.LE_OK //亮灯正常 } else { except = protocol.LE_ON_BRIGHTNESS //亮灯异常(亮度异常) } } else { //异常熄灯 except = protocol.LE_OFF } break } } } //手动控检查 if switchoot, ok := o.mapTempLampsOOT[lnd.LampID]; ok { if switchoot.isInTimeRange(now) { if switchoot.Brightness == b1 { except = protocol.LE_OK //正常亮灯/熄灯 } else { if b1 > 0 { if switchoot.Brightness == 0 { except = protocol.LE_ON //异常亮灯 } else { except = protocol.LE_ON_BRIGHTNESS //亮度异常 } } else { except = protocol.LE_OFF //异常熄灯 } } } } if except == protocol.LE_UNKNOWN { if b1 > 0 { except = protocol.LE_ON //异常亮灯 } else { except = protocol.LE_OK } } if a, ok := o.mapLampAlarm[lnd.DID]; ok { if except == protocol.LE_OK { //告警结束 a.Alarm.EndTime = now.Format("2006-01-02 15:04:05") a.Alarm.Brightness = b1 } } else { if except != protocol.LE_OK { //告警开始 a := protocol.LampAlarm{DID: lnd.DID, AlarmType: except, AlarmBrightness: b1, StartTime: now.Format("2006-01-02 15:04:05")} lai := LampAlarmInfo{Alarm: &a, Send: false} o.mapLampAlarm[lnd.DID] = &lai } } } func (o *Concentrator) nextSeq() uint8 { o.mutexSeq.Lock() defer o.mutexSeq.Unlock() o.seq++ return o.seq } func (o *Concentrator) SendRecvData(aduRequest []byte, retry int) (aduResponse []byte, err error) { serial := GetSerialMgr().GetSerialPort(o.devInfo.Code) if serial == nil { return nil, ErrClosedConnection } if retry <= 0 { retry = 1 } for ; retry > 0; retry-- { aduResponse, err = serial.SendRecvData(aduRequest, FlagChZigbee, o.devInfo.WaitTime) if err == nil { break } } return aduResponse, err } func (o *Concentrator) SendData(aduRequest []byte, retry int) (err error) { serial := GetSerialMgr().GetSerialPort(o.devInfo.Code) if serial == nil { return ErrClosedConnection } if retry <= 0 { retry = 1 } for ; retry > 0; retry-- { if err = serial.SendData(aduRequest, FlagChZigbee, o.devInfo.WaitTime); err == nil { break } } return err } // BroadcastTime 广播校时 func (o *Concentrator) BroadcastTime() error { t := protocol.BJNow() var pack zigbee.PackUpgradeFuncCommand pack.SetData(0x0000FEFE, zigbee.CmdSetBroadcastCorrectiontime, o.nextSeq(), []byte{0xFE, 0xFE, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second())}) buff, err := pack.EnCode() defer PoolPut(buff) if buff != nil { err = o.SendData(buff.B, 2) } return err } // BroadcastOnOrOff 广播开关灯 func (o *Concentrator) BroadcastOnOrOff(on, brightness uint8) error { var cmd = zigbee.CmdSetBroadcastOn if on == 0 { cmd = zigbee.CmdSetBroadcastOff } var pack zigbee.PackUpgradeFuncCommand pack.SetData(0x0000FEFE, cmd, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF, brightness, brightness}) //灯1和2 buff, err := pack.EnCode() defer PoolPut(buff) if buff != nil { err = o.SendData(buff.B, 2) } return err } // BroadcastAuto 广播恢复时控模式 func (o *Concentrator) BroadcastAuto() error { var pufc zigbee.PackUpgradeFuncCommand //pufc.SetData(0x0000fefe, zigbee.CMD_SET_BROADCAST_AUTO, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF}) pufc.SetData(0x0000fefe, zigbee.CmdSetBroadcastAuto, o.nextSeq(), []byte{0xFE, 0xFE, 0x03}) //灯1和2 buff, err := pufc.EnCode() defer PoolPut(buff) if buff != nil { err = o.SendData(buff.B, 3) } return err } // SetOnOffTime 设置灯1开关灯时间段 func (o *Concentrator) SetOnOffTime(PoleID uint32, Cmd uint8, data []zigbee.OnOffTime) error { buff0 := bytebufferpool.Get() defer PoolPut(buff0) buff0.Write(GetPoleIDByte(PoleID)) length := len(data) for i := 0; i < 4; i++ { if i < length { buff0.Write(data[i].EnCode()) } else { buff0.Write((&zigbee.OnOffTime{}).EnCode()) } } var pgfc zigbee.PackGeneralFuncCommand pgfc.SetData(PoleID, Cmd, o.nextSeq(), buff0.B) buff, err := pgfc.EnCode() defer PoolPut(buff) if buff != nil { _, err = o.SendRecvData(buff.B, 3) } return err } // GetOnOffTime 读取灯1开关灯时间段 func (o *Concentrator) GetOnOffTime(PoleID uint32, Cmd uint8) ([]zigbee.OnOffTime, error) { var pgfc zigbee.PackGeneralFuncCommand pgfc.SetData(PoleID, Cmd, o.nextSeq(), GetPoleIDByte(PoleID)) buff, err := pgfc.EnCode() if err != nil { return nil, err } defer PoolPut(buff) var recvdata []byte recvdata, err = o.SendRecvData(buff.B, 1) if err != nil { return nil, err } var pgfcresp zigbee.PackGeneralFuncCommand err = pgfcresp.DeCode(recvdata) if err != nil { return nil, err } if len(pgfcresp.Data) >= 22 { oot := make([]zigbee.OnOffTime, 4) oot[0].DeCode(pgfcresp.Data[2:7]) oot[1].DeCode(pgfcresp.Data[7:12]) oot[2].DeCode(pgfcresp.Data[12:17]) oot[3].DeCode(pgfcresp.Data[17:22]) //时分都是0 ret := make([]zigbee.OnOffTime, 0, 4) for _, v := range oot { if v.OnHour == v.OffHour && v.OnMinite == v.OffMinite && v.OnHour == 0 && v.OnMinite == 0 { continue } ret = append(ret, v) } return ret, nil } return nil, errors.New("读取开关灯时间返回的内容错误") } // ReadPoleTime 读取单灯时间 func (o *Concentrator) ReadPoleTime(PoleID uint32) (uint8, uint8, uint8, error) { //从4位带分组编号的灯杆编号中取2位灯杆编号 var pgfc zigbee.PackGeneralFuncCommand pgfc.SetData(PoleID, zigbee.CmdReadTime, o.nextSeq(), GetPoleIDByte(PoleID)) buff, err := pgfc.EnCode() if err != nil { return 0, 0, 0, err } defer PoolPut(buff) var recvdata []byte recvdata, err = o.SendRecvData(buff.B, 1) if err != nil { return 0, 0, 0, err } var pgfcresp zigbee.PackGeneralFuncCommand err = pgfcresp.DeCode(recvdata) if err != nil { return 0, 0, 0, err } if len(pgfcresp.Data) >= 5 { return pgfcresp.Data[2], pgfcresp.Data[3], pgfcresp.Data[4], nil } return 0, 0, 0, errors.New("读取单灯时间返回的内容错误") } // ElecPara 读单灯电流电压 type ElecPara struct { Voltage [3]float64 Current [2]float64 Degree [2]uint16 } func (o *Concentrator) ReadElectricalPara(PoleID uint32) (*ElecPara, error) { var pgrc zigbee.PackGeneralFuncCommand pgrc.SetData(PoleID, zigbee.CmdReadDldy, o.nextSeq(), GetPoleIDByte(PoleID)) buff, err := pgrc.EnCode() if err != nil { return nil, err } defer PoolPut(buff) var recvdata []byte recvdata, err = o.SendRecvData(buff.B, 1) if err != nil { return nil, err } var pgfcresp zigbee.PackGeneralFuncCommand err = pgfcresp.DeCode(recvdata) if err != nil { return nil, err } if pgfcresp.Cmd == zigbee.CmdReadDldy && len(pgfcresp.Data) >= 13 { var ep ElecPara ep.Voltage[0] = float64(pgfcresp.Data[2] * 2) ep.Voltage[1] = float64(pgfcresp.Data[3] * 2) ep.Voltage[2] = float64(pgfcresp.Data[4] * 2) ep.Current[0] = float64(pgfcresp.Data[5])*0.1 + float64(pgfcresp.Data[7])*0.001 ep.Current[1] = float64(pgfcresp.Data[6])*0.1 + float64(pgfcresp.Data[8])*0.001 ep.Degree[0] = binary.BigEndian.Uint16(pgfcresp.Data[9:11]) ep.Degree[1] = binary.BigEndian.Uint16(pgfcresp.Data[11:13]) return &ep, nil } return nil, errors.New("读取电流电压电度返回错误") } // SetBrightness 设单灯1,2亮度值 func (o *Concentrator) SetBrightness(PoleID uint32, brightness1 uint8, brightness2 uint8) error { data := make([]byte, 0, 7) data = append(data, GetPoleIDByte(PoleID)...) data = append(data, brightness1, brightness2, 0xFF, 0xFF) //灯1,灯2亮度,后边灯3灯4保留 var pgfc zigbee.PackGeneralFuncCommand pgfc.SetData(PoleID, zigbee.CmdSetBrightness, o.nextSeq(), data) buff, err := pgfc.EnCode() defer PoolPut(buff) if buff != nil { _, err = o.SendRecvData(buff.B, 1) } return err } // GetBrightness 查询单灯1,2亮度值 func (o *Concentrator) GetBrightness(PoleID uint32) (uint8, uint8, error) { var pgfc zigbee.PackGeneralFuncCommand pgfc.SetData(PoleID, zigbee.CmdReadBrightness, o.nextSeq(), GetPoleIDByte(PoleID)) buff, err := pgfc.EnCode() if err != nil { return 0, 0, err } defer PoolPut(buff) var recvdata []byte recvdata, err = o.SendRecvData(buff.B, 1) if err != nil { return 0, 0, err } var pgfcresp zigbee.PackGeneralFuncCommand err = pgfcresp.DeCode(recvdata) if err != nil { return 0, 0, err } if pgfcresp.Cmd == zigbee.CmdReadBrightness && len(pgfcresp.Data) >= 4 { return pgfcresp.Data[2], pgfcresp.Data[3], nil } return 0, 0, errors.New("查询亮度返回错误") } // SetTime 单灯校时 func (o *Concentrator) SetTime(PoleID uint32) error { t := protocol.BJNow() data := make([]byte, 0, 5) data = append(data, GetPoleIDByte(PoleID)...) data = append(data, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second())) var pack zigbee.PackGeneralFuncCommand pack.SetData(PoleID, zigbee.CmdSetCorrectiontime, o.nextSeq(), data) buff, err := pack.EnCode() defer PoolPut(buff) if buff != nil { _, err = o.SendRecvData(buff.B, 3) } return err } // SetPoleAuto 单灯恢复时控 func (o *Concentrator) SetPoleAuto(PoleID uint32) error { var pack zigbee.PackGeneralFuncCommand pack.SetData(PoleID, zigbee.CmdSetAuto, o.nextSeq(), GetPoleIDByte(PoleID)) buff, err := pack.EnCode() defer PoolPut(buff) if buff != nil { _, err = o.SendRecvData(buff.B, 3) } return err } func (o *Concentrator) HandleCache(m mqtt.Message) { o.downQueue.Put(m) } func (o *Concentrator) HandleTpChzbSetBroadcasttime(m mqtt.Message) { var obj protocol.Pack_CHZB_EmptyObject var ret protocol.Pack_Ack var err error if err = obj.DeCode(m.PayloadString()); err != nil { return } err = o.BroadcastTime() if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *Concentrator) HandleTpChzbSetWaittime(m mqtt.Message) { var obj protocol.Pack_CHZB_Waittime var ret protocol.Pack_Ack var err error if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Data.Waittime < 1000 || obj.Data.Waittime > 15000 { err = errors.New("设置的等待时间不在[1000,15000]范围") } else { o.devInfo.WaitTime = obj.Data.Waittime } if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_WAITTIME_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *Concentrator) HandleTpChzbSetSwitch(m mqtt.Message) { var obj protocol.Pack_CHZB_Switch var ret protocol.Pack_CHZB_SeqLampAck var err error mapIpole := make(map[uint32]*protocol.StateError) if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Id != o.devInfo.DevCode { return } Brightness := obj.Data.Brightness if obj.Data.Switch == 0 && Brightness > 0 { Brightness = 0 } mapRedisTempLampsOOT := make(map[string]interface{}) //临时开关灯记录,用于排除异常亮灯正常亮灯的情况 ltr := LampTimeRange{ Start: util.MLTime(util.MlNow()), End: util.MLTime(util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second)), //延迟2分钟,以防指令在队列中未及时执行 Brightness: Brightness, } str, _ := json.MarshalToString(ltr) if len(obj.Data.LampIDs) == 0 { //广播 err = o.BroadcastOnOrOff(obj.Data.Switch, Brightness) o.broadcastAutoTime = util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second) for k := range o.mapLamps { mapRedisTempLampsOOT[strconv.Itoa(int(k))] = str //redis o.mapTempLampsOOT[k] = <r //内存 } } else { //指定的灯 for _, pid := range obj.Data.LampIDs { //过滤掉不正常的pid if pid == 0 || pid == 0x0000FEFE { continue } err = o.SetBrightness(pid, Brightness, Brightness) mapIpole[pid] = protocol.NewStateError(err) mapRedisTempLampsOOT[strconv.Itoa(int(pid))] = str //redis o.mapTempLampsOOT[pid] = <r //内存 } } redisEdgeData.HSet(LampSwitchPrefix+o.devInfo.DevCode, mapRedisTempLampsOOT) if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH_ACK), str, mqtt.AtMostOnce, ToAll) } } func (o *Concentrator) HandleTpChzbSetRecoveryAuto(m mqtt.Message) { var obj protocol.Pack_CHZB_Switch var ret protocol.Pack_CHZB_SeqLampAck var err error mapIpole := make(map[uint32]*protocol.StateError) if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Id != o.devInfo.DevCode { return } if len(obj.Data.LampIDs) == 0 { //广播 o.CheckRecoveryAuto(true) } else { strList := make([]string, 0, len(obj.Data.LampIDs)) for _, v := range obj.Data.LampIDs { err := o.SetPoleAuto(v) if err == nil { delete(o.mapTempLampsOOT, v) strList = append(strList, strconv.Itoa(int(v))) } mapIpole[v] = protocol.NewStateError(err) } redisEdgeData.HDel(LampSwitchPrefix+o.devInfo.DevCode, strList...) } if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK), str, mqtt.AtMostOnce, ToAll) } } func (o *Concentrator) HandleSetOnOffTime(m mqtt.Message) { var obj protocol.Pack_SetOnOffTime var ret protocol.Pack_CHZB_SeqLampAck var err error mapIpole := make(map[uint32]*protocol.StateError) if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Id != o.devInfo.DevCode { return } if len(obj.Data.LampIDs) == 0 || len(obj.Data.OnOffTime) == 0 { return } mapRedisOOT := make(map[string]interface{}) var data []zigbee.OnOffTime for _, t := range obj.Data.OnOffTime { dat := zigbee.OnOffTime{ OnHour: t.OnHour, OnMinite: t.OnMinite, OffHour: t.OffHour, OffMinite: t.OffMinite, Brightness: t.Brightness, } data = append(data, dat) } dataStr, _ := json.MarshalToString(data) for _, v := range obj.Data.LampIDs { if v == 0 || v == 0x0000FEFE { //编号等于0或为广播地址则不处理 continue } o.SetOnOffTime(v, zigbee.CmdSetOnofftime, data) mapIpole[v] = protocol.NewStateError(err) mapRedisOOT[strconv.Itoa(int(v))] = dataStr //缓存到redis o.mapLamps2OOT[v] = data //缓存在内存中 } //持久缓存到redis,以便于重启后读取进内存中 redisEdgeData.HSet(LampOotPrefix+o.devInfo.DevCode, mapRedisOOT) if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *Concentrator) HandleQueryOnOffTime(m mqtt.Message) { var obj protocol.Pack_CHZB_QueryOnOffTime var oot []zigbee.OnOffTime var err error if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Id != o.devInfo.DevCode { return } if obj.Data.Poleid > 0 { oot, err = o.GetOnOffTime(obj.Data.Poleid, zigbee.CmdReadOnofftime) } var ret protocol.Pack_CHZB_QueryOnOffTimeAck var offTimes []protocol.CHZB_OnOffTime for _, v := range oot { x := protocol.CHZB_OnOffTime{ OnHour: v.OnHour, OnMinite: v.OnMinite, OffHour: v.OffHour, OffMinite: v.OffMinite, Brightness: v.Brightness, } offTimes = append(offTimes, x) } if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, obj.Data.Poleid, err, offTimes); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *Concentrator) HandleSetUpdateLamp(m mqtt.Message) { var obj protocol.Pack_CHZB_LampIDs var ret protocol.Pack_Ack var err error if err = obj.DeCode(m.PayloadString()); err != nil { return } if obj.Id != o.devInfo.DevCode { return } if len(obj.Data.MapLamps) > 0 { mapLampsTmp := make(map[uint32]string) for k, v := range obj.Data.MapLamps { //过滤掉不正常的数据 if k > 0 && v != "" { mapLampsTmp[k] = v } } o.mapLamps = mapLampsTmp } if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *Concentrator) HandleQueryTime(m mqtt.Message) { var obj protocol.Pack_CHZB_QueryTime var ret protocol.Pack_CHZB_QueryTimeAck if err := obj.DeCode(m.PayloadString()); err != nil { return } var pt *protocol.CHZB_LampTime = nil hh, mm, ss, err := o.ReadPoleTime(obj.Data.LampID) if err == nil { pt = &protocol.CHZB_LampTime{Hour: hh, Minite: mm, Second: ss} } if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, obj.Data.LampID, err, pt); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME_ACK), str, mqtt.AtMostOnce, ToCloud) } }