||
- package main
- import (
- "context"
- "encoding/binary"
- "encoding/hex"
- "errors"
- "fmt"
- "runtime/debug"
- "strconv"
- "sync"
- "time"
- "github.com/go-redis/redis/v7"
- "github.com/sirupsen/logrus"
- "github.com/valyala/bytebufferpool"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- "lc/edge/ipole/zigbee"
- )
- var ConcentratorProtocol = "CHDDJK-Zigbee"
- var LampOotPrefix = "lamp_oot_"
- var LampSwitchPrefix = "lamp_switch_"
- var LampBroadcastPrefix = "lamp_broadcastauto_"
- var LampAlarmPrefix = "lamp_alarm_"
- func PoolPut(b *bytebufferpool.ByteBuffer) {
- if b != nil {
- bytebufferpool.Put(b)
- }
- }
- func GetPoleIDByte(PoleID uint32) []byte {
- tmp := make([]byte, 4)
- binary.BigEndian.PutUint32(tmp, PoleID)
- return tmp[2:4]
- }
- type LampTimeRange struct {
- Start util.MLTime `json:"start"`
- End util.MLTime `json:"end"`
- Brightness uint8 `json:"brightness"` //0熄灯,大于0都是开灯
- }
- func (o *LampTimeRange) isInTimeRange(t time.Time) bool {
- if t.After(time.Time(o.Start)) && t.Before(time.Time(o.End)) {
- return true
- }
- return false
- }
- type LampAlarmInfo struct {
- Alarm *protocol.LampAlarm `json:"alarm"`
- Send bool `json:"send"`
- }
- // Concentrator 集中器管理
- type Concentrator struct {
- seq uint8
- mutexSeq sync.Mutex
- devinfo *protocol.DevInfo
- model *protocol.IotModel
- ctx context.Context
- cancel context.CancelFunc
- downQueue *util.MlQueue
- readQueue *util.MlQueue //读取数据的队列,读完则发送
- mapLamps map[uint32]string
- mapTopicHandle map[string]func(m mqtt.Message)
- mapLamps2OOT map[uint32][]zigbee.OnOffTime //时控策略
- mapTempLampsOOT map[uint32]*LampTimeRange //临时手动控时间段,手动控制开关灯
- broadcastAutoTime time.Time //广播模式截止时间,过期自动恢复
- mapLampAlarm map[string]*LampAlarmInfo //告警数据
- chanDevInfo chan *protocol.DevInfo //设备管理更新
- chanModelInfo chan *ModelInfo //设备管理更新
- }
- func NewConcentrator(info *protocol.DevInfo) Device {
- ctx, cancel := context.WithCancel(context.Background())
- dev := &Concentrator{
- mapLamps: make(map[uint32]string),
- devinfo: info,
- ctx: ctx,
- cancel: cancel,
- downQueue: util.NewQueue(200),
- readQueue: util.NewQueue(200),
- mapTopicHandle: make(map[string]func(m mqtt.Message)),
- mapLamps2OOT: make(map[uint32][]zigbee.OnOffTime), //时控策略
- mapTempLampsOOT: make(map[uint32]*LampTimeRange), //临时时间段,手动控制开关灯
- broadcastAutoTime: time.Time{},
- mapLampAlarm: make(map[string]*LampAlarmInfo),
- chanDevInfo: make(chan *protocol.DevInfo),
- chanModelInfo: make(chan *ModelInfo),
- }
- iot, err := loadModel(info.TID)
- if err == nil && iot.TID == info.TID && iot.Protocol == ConcentratorProtocol {
- dev.model = iot
- }
- dev.SetTopicHandle()
- return dev
- }
- func (o *Concentrator) SetTopicHandle() {
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME)] = o.HandleTpChzbSetBroadcasttime
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_WAITTIME)] = o.HandleTpChzbSetWaittime
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_SWITCH)] = o.HandleTpChzbSetSwitch
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO)] = o.HandleTpChzbSetRecoveryAuto
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME)] = o.HandleTpChzbSetOnofftime
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME)] = o.HandleTpChzbQueryOnofftime
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP)] = o.HandleTpChzbSetUpdateLamp
- o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_TIME)] = o.HandleTpChzbQueryTime
- }
- func (o *Concentrator) MQTTSubscribe() {
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //广播校时
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_WAITTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置zigbee集中器收发等待时间
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_SWITCH), mqtt.ExactlyOnce, o.HandleCache, ToAll) //开关灯,广播开关灯
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO), mqtt.ExactlyOnce, o.HandleCache, ToAll) //广播开关灯
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置开关灯时间段
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取开关灯时间段
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //更新灯控末端
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_TIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取单灯末端时间
- }
- func (o *Concentrator) Start() {
- o.MQTTSubscribe()
- retry := 3
- sleep := time.Duration(2)
- for i := 0; i < retry; i++ {
- if err := o.ReloadOOTFromRedis(); err == nil {
- break
- }
- time.Sleep(sleep * time.Second)
- }
- for i := 0; i < retry; i++ {
- if err := o.ReloadSwitchOOTFromRedis(); err == nil {
- break
- }
- time.Sleep(sleep * time.Second)
- }
- for i := 0; i < retry; i++ {
- if err := o.ReloadBroadCastFromRedis(); err == nil {
- break
- }
- time.Sleep(sleep * time.Second)
- }
- for i := 0; i < retry; i++ {
- if err := o.ReloadLampAlarmFromRedis(); err == nil {
- break
- }
- time.Sleep(sleep * time.Second)
- }
- go o.Handle()
- }
- func (o *Concentrator) Stop() {
- o.cancel()
- }
- func (o *Concentrator) UpdateInfo(devinfo protocol.DevInfo) {
- o.chanDevInfo <- &devinfo
- }
- func (o *Concentrator) GetDevInfo() *protocol.DevInfo {
- return o.devinfo
- }
- func (o *Concentrator) UpdateModel(tid uint16, flag int) {
- if tid > 0 {
- mi := ModelInfo{
- TID: tid,
- Flag: flag,
- }
- o.chanModelInfo <- &mi
- }
- }
- func (o *Concentrator) UpdateModel2(mi *ModelInfo) {
- if o.devinfo.TID != mi.TID {
- return
- }
- if mi.Flag == 0 {
- logrus.Errorf("Concentrator.UpdateModel2:设备[%s]的物模型[tid=%d]模型文件被删除,下次启动即将生效。", o.devinfo.DevCode, mi.TID)
- return
- }
- logrus.Debugf("Concentrator.UpdateModel2:更新设备[%s]的物模型[%d]", o.devinfo.DevCode, mi.TID)
- iot, err := loadModel(mi.TID)
- if err != nil {
- logrus.Errorf("Concentrator.UpdateModel2:加载模型[%d]文件错误:%s", mi.TID, err.Error())
- return
- }
- if iot.Protocol == ConcentratorProtocol { //合法的物模型
- o.model = iot
- logrus.Infof("Concentrator.UpdateModel2:更新设备[%s]的物模型[%d]成功", o.devinfo.DevCode, mi.TID)
- } else {
- logrus.Error("Concentrator.UpdateModel2:物模型错误,TID和文件名tid不一致或协议非ModbusRTU协议")
- }
- }
- func (o *Concentrator) ReloadOOTFromRedis() error {
- mapdata, err := redisEdgeData.HGetAll(LampOotPrefix + o.devinfo.DevCode).Result()
- if err != nil {
- if err == redis.Nil {
- return nil
- }
- logrus.Errorf("Concentrator.ReloadOOTFromRedis设备[%s]从redis加载时间策略失败:%s", o.devinfo.DevCode, err.Error())
- return err
- }
- for k, v := range mapdata {
- var oot []zigbee.OnOffTime
- lampid, errkey := strconv.Atoi(k)
- errval := json.UnmarshalFromString(v, &oot)
- if errkey == nil && errval == nil {
- o.mapLamps2OOT[uint32(lampid)] = oot
- }
- }
- return nil
- }
- func (o *Concentrator) ReloadSwitchOOTFromRedis() error {
- mapdata, err := redisEdgeData.HGetAll(LampSwitchPrefix + o.devinfo.DevCode).Result()
- if err != nil {
- if err == redis.Nil {
- return nil
- }
- logrus.Errorf("Concentrator.ReloadSwitchOOTFromRedis设备[%s]从redis加载时间策略失败:%s", o.devinfo.DevCode, err.Error())
- return err
- }
- for k, v := range mapdata {
- var ltr LampTimeRange
- lampid, errkey := strconv.Atoi(k)
- errval := json.UnmarshalFromString(v, <r)
- if errkey == nil && errval == nil {
- o.mapTempLampsOOT[uint32(lampid)] = <r
- }
- }
- return nil
- }
- func (o *Concentrator) ReloadBroadCastFromRedis() error {
- strTime, err := redisEdgeData.Get(LampBroadcastPrefix + o.devinfo.DevCode).Result()
- if err != nil {
- if err == redis.Nil {
- return nil
- }
- logrus.Errorf("Concentrator.ReloadBroadCastFromRedis设备[%s]从redis加载广播恢复截止时间失败:%s", o.devinfo.DevCode, err.Error())
- return err
- }
- if t, err := util.MlParseTime(strTime); err == nil {
- o.broadcastAutoTime = t
- }
- return nil
- }
- func (o *Concentrator) ReloadLampAlarmFromRedis() error {
- mapAlarm, err := redisEdgeData.HGetAll(LampAlarmPrefix + o.devinfo.DevCode).Result()
- if err != nil {
- if err == redis.Nil {
- return nil
- }
- logrus.Errorf("Concentrator.ReloadLampAlarmFromRedis设备[%s]从redis加载广播恢复截止时间失败:%s", o.devinfo.DevCode, err.Error())
- return err
- }
- for k, v := range mapAlarm {
- var lai LampAlarmInfo
- if err := json.UnmarshalFromString(v, &lai); err == nil {
- o.mapLampAlarm[k] = &lai
- } else {
- logrus.Errorf("从redis获取的告警信息还原失败,原内容:%s,失败原因:%s", v, err.Error())
- }
- }
- return nil
- }
- func (o *Concentrator) Handle() {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("Concentrator.Handle发生异常:%v", err)
- logrus.Errorf("Concentrator.Handle发生异常,堆栈信息:%s", string(debug.Stack()))
- go o.Handle()
- }
- }()
- o.queryPoleids()
- o.BroadcastTime()
- exit := false
- mapData := make(map[string]*protocol.CHZB_LampData)
- LastTime := util.MlNow()
- nextFillTime := time.Time{}
- for {
- select {
- case <-o.ctx.Done():
- logrus.Errorf("设备[%s]的HandlePole退出,原因:%v", o.devinfo.DevCode, o.ctx.Err())
- exit = true
- case devinfo_ := <-o.chanDevInfo:
- o.devinfo = devinfo_
- case mi := <-o.chanModelInfo:
- o.UpdateModel2(mi)
- default:
- //从队列钟获取指令执行
- if m, ok, _ := o.downQueue.Get(); ok {
- if mm, ok := m.(mqtt.Message); ok {
- if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
- fn(mm)
- } else {
- logrus.Errorf("Concentrator.Handle:不支持的主题:%s", mm.Topic())
- }
- }
- } else {
- if exit { //退出前全部恢复时控模式
- o.CheckRecoveryAuto(true)
- return
- }
- //每小时同步一次时间
- if time.Now().Sub(LastTime).Minutes() > 60 {
- o.queryPoleids() //每小时从服务端同步一次单灯编号
- o.BroadcastTime() //每小时广播一次时间同步消息
- LastTime = time.Now()
- }
- o.CheckRecoveryAuto(false)
- quantity, send := o.NextQueueRead(mapData)
- if quantity == 0 || send {
- if nextFillTime.IsZero() || nextFillTime.Before(util.MlNow()) {
- o.fillReadQueue()
- nextFillTime = util.MlNow().Add(time.Duration(o.devinfo.SendCloud) * time.Millisecond)
- }
- if send {
- mapData = make(map[string]*protocol.CHZB_LampData)
- o.UploadLampAlarm()
- }
- time.Sleep(300 * time.Millisecond)
- }
- }
- }
- }
- }
- // CheckRecoveryAuto force=true 强制全部恢复时控模式
- func (o *Concentrator) CheckRecoveryAuto(force bool) {
- //存在广播控时,如果广播控过期,则全部恢复时控模式,未过期,则不处理
- if (!o.broadcastAutoTime.IsZero() && o.broadcastAutoTime.Before(util.MlNow())) || force {
- for k := range o.mapTempLampsOOT {
- delete(o.mapTempLampsOOT, k)
- }
- if err := o.BroadcastAuto(); err != nil {
- logrus.Errorf("广播恢复时控失败:%s", err.Error())
- } else {
- o.broadcastAutoTime = time.Time{}
- //删除redis中所有临时开关灯记录
- if err := redisEdgeData.Del(LampSwitchPrefix + o.devinfo.DevCode).Err(); err != nil {
- logrus.Errorf("手动广播恢复时控,更新redis失败:%s", err.Error())
- }
- logrus.Info("广播恢复时控成功")
- }
- } else {
- //如果广播控模式未过期,则判断单灯是否手动控过期,过期则恢复
- var strList []string
- for k, v := range o.mapTempLampsOOT {
- if time.Time(v.End).Before(util.MlNow()) {
- if err := o.SetPoleAuto(k); err != nil {
- logrus.Errorf("单灯[%d]恢复时控失败:%s", k, err.Error())
- } else {
- logrus.Infof("单灯[%d]恢复时控成功", k)
- strList = append(strList, strconv.Itoa(int(k)))
- delete(o.mapTempLampsOOT, k)
- }
- }
- }
- if len(strList) > 0 {
- if err := redisEdgeData.HDel(LampSwitchPrefix+o.devinfo.DevCode, strList...).Err(); err != nil {
- logrus.Errorf("手动恢复灯控[%v]时控模式,更新redis失败:%s", strList, err.Error())
- }
- }
- }
- }
- func (o *Concentrator) queryPoleids() {
- var obj protocol.Pack_CHZB_EmptyObject
- if str, err := obj.EnCode(o.devinfo.DevCode, appConfig.GID, GetNextUint64()); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_LAMP), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func ConvertExcept(except uint16) string {
- //-1初始值 0正常,1异常亮灯,2异常熄灯,3亮度异常
- switch except {
- case protocol.LE_OK: //正常(正常开灯或熄灯状态)
- return "正常"
- case protocol.LE_ON: //亮灯异常(本该关灯状态)
- return "熄灯时段亮灯"
- case protocol.LE_OFF: //亮灯,但亮度异常(本该开灯状态,但开灯亮度不对)
- return "亮灯时段熄灯"
- case protocol.LE_ON_BRIGHTNESS:
- return "亮灯但亮度异常"
- default:
- return "状态未知"
- }
- }
- type LampNumberDID struct {
- LampID uint32
- DID string
- }
- func (o *Concentrator) fillReadQueue() {
- for k, v := range o.mapLamps {
- o.readQueue.Put(LampNumberDID{LampID: k, DID: v})
- }
- }
- func (o *Concentrator) NextQueueRead(mapData map[string]*protocol.CHZB_LampData) (uint32, bool) {
- val, ok, quantity := o.readQueue.Get()
- if ok {
- lnd := val.(LampNumberDID)
- var err_ error
- var data protocol.CHZB_LampData
- data.Data = make(map[uint16]float64)
- if b1, b2, err := o.GetBrightness(lnd.LampID); err == nil {
- data.Data[1] = float64(b1)
- data.Data[2] = float64(b2)
- //判断灯亮是否正常
- o.CheckLampAlarm(lnd, b1, b2)
- } else {
- err_ = err
- }
- if e, err := o.ReadElectricalPara(lnd.LampID); err == nil {
- data.Data[3] = e.Voltage[0]
- data.Data[4] = e.Voltage[1]
- data.Data[5] = e.Voltage[2]
- data.Data[6] = e.Current[0]
- data.Data[7] = e.Current[1]
- data.Data[8] = float64(e.Degree[0])
- data.Data[9] = float64(e.Degree[1])
- } else {
- err_ = err
- }
- data.SetStateErrorData(err_)
- mapData[lnd.DID] = &data
- }
- if quantity == 0 && len(mapData) > 0 {
- var obj protocol.Pack_CHZB_UploadData
- if str, err := obj.EnCode(o.devinfo.DevCode, appConfig.GID, GetNextUint64(), o.devinfo.TID, mapData); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_DATA), str, mqtt.AtMostOnce, ToCloud)
- }
- return quantity, true
- }
- return quantity, false
- }
- // UploadLampAlarm 告警开始上报和告警结束上报
- func (o *Concentrator) UploadLampAlarm() {
- var toDelete []string
- for k, v := range o.mapLampAlarm {
- if !v.Send && v.Alarm.EndTime == "" { //告警开始上报
- var obj protocol.Pack_CHZB_LampAlarm
- if str, err := obj.EnCode(o.devinfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud)
- }
- v.Send = true
- //缓存到redis
- strAlarm, _ := json.MarshalToString(v)
- mapRedis := make(map[string]interface{})
- mapRedis[k] = strAlarm
- if err := redisEdgeData.HSet(LampAlarmPrefix+o.devinfo.DevCode, mapRedis).Err(); err != nil {
- logrus.Errorf("告警信息缓存入redis失败:%s", err.Error())
- }
- } else if v.Alarm.EndTime != "" { //告警结束上报
- var obj protocol.Pack_CHZB_LampAlarm
- if str, err := obj.EnCode(o.devinfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud)
- }
- toDelete = append(toDelete, k)
- delete(o.mapLampAlarm, k)
- }
- }
- if len(toDelete) > 0 {
- if err := redisEdgeData.HDel(LampAlarmPrefix+o.devinfo.DevCode, toDelete...).Err(); err != nil {
- logrus.Errorf("告警信息从redis删除失败:%s", err.Error())
- }
- }
- }
- // CheckLampAlarm 检查开灯、关灯、亮度异常
- func (o *Concentrator) CheckLampAlarm(lnd LampNumberDID, b1, b2 uint8) {
- //真实数据时间
- now := util.MlNow().Add(-time.Duration(o.devinfo.WaitTime) * time.Millisecond)
- except := uint16(protocol.LE_UNKNOWN)
- //策略时间段检查
- if oots, ok := o.mapLamps2OOT[lnd.LampID]; ok {
- for _, oot := range oots { //亮灯时间段
- if oot.InTimeRange(util.MlNow()) {
- if b1 > 0 { //亮灯
- if oot.Brightness == b1 {
- except = protocol.LE_OK //亮灯正常
- } else {
- except = protocol.LE_ON_BRIGHTNESS //亮灯异常(亮度异常)
- }
- } else { //异常熄灯
- except = protocol.LE_OFF
- }
- break
- }
- }
- }
- //手动控检查
- if switchoot, ok := o.mapTempLampsOOT[lnd.LampID]; ok {
- if switchoot.isInTimeRange(now) {
- if switchoot.Brightness == b1 {
- except = protocol.LE_OK //正常亮灯/熄灯
- } else {
- if b1 > 0 {
- if switchoot.Brightness == 0 {
- except = protocol.LE_ON //异常亮灯
- } else {
- except = protocol.LE_ON_BRIGHTNESS //亮度异常
- }
- } else {
- except = protocol.LE_OFF //异常熄灯
- }
- }
- }
- }
- if except == protocol.LE_UNKNOWN {
- if b1 > 0 {
- except = protocol.LE_ON //异常亮灯
- } else {
- except = protocol.LE_OK
- }
- }
- if a, ok := o.mapLampAlarm[lnd.DID]; ok {
- if except == protocol.LE_OK { //告警结束
- a.Alarm.EndTime = now.Format("2006-01-02 15:04:05")
- a.Alarm.Brightness = b1
- logrus.Debugf("灯控[%s]状态[%s]", lnd.DID, ConvertExcept(except))
- }
- } else {
- if except != protocol.LE_OK { //告警开始
- a := protocol.LampAlarm{DID: lnd.DID, AlarmType: except, AlarmBrightness: b1, StartTime: now.Format("2006-01-02 15:04:05")}
- lai := LampAlarmInfo{Alarm: &a, Send: false}
- o.mapLampAlarm[lnd.DID] = &lai
- logrus.Debugf("灯控[%s]状态[%s]", lnd.DID, ConvertExcept(except))
- }
- }
- }
- func (o *Concentrator) nextSeq() uint8 {
- o.mutexSeq.Lock()
- defer o.mutexSeq.Unlock()
- o.seq++
- return o.seq
- }
- func (o *Concentrator) SendRecvData(aduRequest []byte, retry int) (aduResponse []byte, err error) {
- serial := GetSerialMgr().GetSerialPort(o.devinfo.Code)
- if serial == nil {
- return nil, ErrClosedConnection
- }
- if retry <= 0 {
- retry = 1
- }
- for ; retry > 0; retry-- {
- aduResponse, err = serial.SendRecvData(aduRequest, FlagChZigbee, o.devinfo.WaitTime)
- if err == nil {
- break
- }
- }
- return aduResponse, err
- }
- func (o *Concentrator) SendData(aduRequest []byte, retry int) (err error) {
- serial := GetSerialMgr().GetSerialPort(o.devinfo.Code)
- if serial == nil {
- return ErrClosedConnection
- }
- if retry <= 0 {
- retry = 1
- }
- for ; retry > 0; retry-- {
- if err = serial.SendData(aduRequest, FlagChZigbee, o.devinfo.WaitTime); err == nil {
- break
- }
- }
- return err
- }
- // BroadcastTime 广播校时
- func (o *Concentrator) BroadcastTime() error {
- t := protocol.BJNow()
- var pack zigbee.PackUpgradeFuncCommand
- pack.SetData(0x0000FEFE, zigbee.CmdSetBroadcastCorrectiontime, o.nextSeq(),
- []byte{0xFE, 0xFE, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second())})
- buff, err := pack.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- err = o.SendData(buff.B, 2)
- }
- return err
- }
- // BroadcastOnOrOff 广播开关灯
- func (o *Concentrator) BroadcastOnOrOff(on, brightness uint8) error {
- var cmd = zigbee.CmdSetBroadcastOn
- if on == 0 {
- cmd = zigbee.CmdSetBroadcastOff
- }
- var pack zigbee.PackUpgradeFuncCommand
- //pack.SetData(0x0000FEFE, cmd, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF})
- pack.SetData(0x0000FEFE, cmd, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF, brightness, brightness}) //灯1和2
- buff, err := pack.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- err = o.SendData(buff.B, 2)
- }
- return err
- }
- // BroadcastAuto 广播恢复时控模式
- func (o *Concentrator) BroadcastAuto() error {
- var pufc zigbee.PackUpgradeFuncCommand
- //pufc.SetData(0x0000fefe, zigbee.CMD_SET_BROADCAST_AUTO, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF})
- pufc.SetData(0x0000fefe, zigbee.CmdSetBroadcastAuto, o.nextSeq(), []byte{0xFE, 0xFE, 0x03}) //灯1和2
- buff, err := pufc.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- err = o.SendData(buff.B, 3)
- }
- return err
- }
- // SetOnOffTime 设置灯1开关灯时间段
- func (o *Concentrator) SetOnOffTime(PoleID uint32, Cmd uint8, data []zigbee.OnOffTime) error {
- buff0 := bytebufferpool.Get()
- defer PoolPut(buff0)
- buff0.Write(GetPoleIDByte(PoleID))
- len := len(data)
- for i := 0; i < 4; i++ {
- if i < len {
- buff0.Write(data[i].EnCode())
- } else {
- buff0.Write((&zigbee.OnOffTime{}).EnCode())
- }
- }
- var pgfc zigbee.PackGeneralFuncCommand
- pgfc.SetData(PoleID, Cmd, o.nextSeq(), buff0.B)
- buff, err := pgfc.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- _, err = o.SendRecvData(buff.B, 3)
- }
- return err
- }
- // GetOnOffTime 读取灯1开关灯时间段
- func (o *Concentrator) GetOnOffTime(PoleID uint32, Cmd uint8) ([]zigbee.OnOffTime, error) {
- var pgfc zigbee.PackGeneralFuncCommand
- pgfc.SetData(PoleID, Cmd, o.nextSeq(), GetPoleIDByte(PoleID))
- buff, err := pgfc.EnCode()
- if err != nil {
- return nil, err
- }
- defer PoolPut(buff)
- var recvdata []byte
- recvdata, err = o.SendRecvData(buff.B, 1)
- if err != nil {
- return nil, err
- }
- var pgfcresp zigbee.PackGeneralFuncCommand
- err = pgfcresp.DeCode(recvdata)
- if err != nil {
- return nil, err
- }
- if len(pgfcresp.Data) >= 22 {
- oot := make([]zigbee.OnOffTime, 4)
- oot[0].DeCode(pgfcresp.Data[2:7])
- oot[1].DeCode(pgfcresp.Data[7:12])
- oot[2].DeCode(pgfcresp.Data[12:17])
- oot[3].DeCode(pgfcresp.Data[17:22])
- //时分都是0
- ret := make([]zigbee.OnOffTime, 0, 4)
- for _, v := range oot {
- if v.OnHour == v.OffHour && v.OnMinite == v.OffMinite &&
- v.OnHour == 0 && v.OnMinite == 0 {
- continue
- }
- ret = append(ret, v)
- }
- return ret, nil
- } else {
- logrus.Errorf("读取开关灯时间返回的内容错误:%s", hex.EncodeToString(recvdata))
- }
- return nil, errors.New("读取开关灯时间返回的内容错误")
- }
- // ReadPoleTime 读取单灯时间
- func (o *Concentrator) ReadPoleTime(PoleID uint32) (uint8, uint8, uint8, error) {
- //从4位带分组编号的灯杆编号中取2位灯杆编号
- var pgfc zigbee.PackGeneralFuncCommand
- pgfc.SetData(PoleID, zigbee.CmdReadTime, o.nextSeq(), GetPoleIDByte(PoleID))
- buff, err := pgfc.EnCode()
- if err != nil {
- return 0, 0, 0, err
- }
- defer PoolPut(buff)
- var recvdata []byte
- recvdata, err = o.SendRecvData(buff.B, 1)
- if err != nil {
- return 0, 0, 0, err
- }
- var pgfcresp zigbee.PackGeneralFuncCommand
- err = pgfcresp.DeCode(recvdata)
- if err != nil {
- return 0, 0, 0, err
- }
- if len(pgfcresp.Data) >= 5 {
- return pgfcresp.Data[2], pgfcresp.Data[3], pgfcresp.Data[4], nil
- } else {
- logrus.Errorf("读取单灯时间返回的内容错误:%s", hex.EncodeToString(recvdata))
- }
- return 0, 0, 0, errors.New("读取单灯时间返回的内容错误")
- }
- // ElecPara 读单灯电流电压
- type ElecPara struct {
- Voltage [3]float64
- Current [2]float64
- Degree [2]uint16
- }
- func (o *Concentrator) ReadElectricalPara(PoleID uint32) (*ElecPara, error) {
- var pgrc zigbee.PackGeneralFuncCommand
- pgrc.SetData(PoleID, zigbee.CmdReadDldy, o.nextSeq(), GetPoleIDByte(PoleID))
- buff, err := pgrc.EnCode()
- if err != nil {
- return nil, err
- }
- defer PoolPut(buff)
- var recvdata []byte
- recvdata, err = o.SendRecvData(buff.B, 1)
- if err != nil {
- return nil, err
- }
- var pgfcresp zigbee.PackGeneralFuncCommand
- err = pgfcresp.DeCode(recvdata)
- if err != nil {
- return nil, err
- }
- if pgfcresp.Cmd == zigbee.CmdReadDldy && len(pgfcresp.Data) >= 13 {
- var ep ElecPara
- ep.Voltage[0] = float64(pgfcresp.Data[2] * 2)
- ep.Voltage[1] = float64(pgfcresp.Data[3] * 2)
- ep.Voltage[2] = float64(pgfcresp.Data[4] * 2)
- ep.Current[0] = float64(pgfcresp.Data[5])*0.1 + float64(pgfcresp.Data[7])*0.001
- ep.Current[1] = float64(pgfcresp.Data[6])*0.1 + float64(pgfcresp.Data[8])*0.001
- ep.Degree[0] = binary.BigEndian.Uint16(pgfcresp.Data[9:11])
- ep.Degree[1] = binary.BigEndian.Uint16(pgfcresp.Data[11:13])
- return &ep, nil
- } else {
- logrus.Errorf("读取电流电压电度返回的内容错误:%s", hex.EncodeToString(recvdata))
- }
- return nil, errors.New("读取电流电压电度返回错误")
- }
- // SetBrightness 设单灯1,2亮度值
- func (o *Concentrator) SetBrightness(PoleID uint32, brightness1 uint8, brightness2 uint8) error {
- data := make([]byte, 0, 7)
- data = append(data, GetPoleIDByte(PoleID)...)
- data = append(data, brightness1, brightness2, 0xFF, 0xFF) //灯1,灯2亮度,后边灯3灯4保留
- var pgfc zigbee.PackGeneralFuncCommand
- pgfc.SetData(PoleID, zigbee.CmdSetBrightness, o.nextSeq(), data)
- buff, err := pgfc.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- _, err = o.SendRecvData(buff.B, 1)
- }
- return err
- }
- // GetBrightness 查询单灯1,2亮度值
- func (o *Concentrator) GetBrightness(PoleID uint32) (uint8, uint8, error) {
- var pgfc zigbee.PackGeneralFuncCommand
- pgfc.SetData(PoleID, zigbee.CmdReadBrightness, o.nextSeq(), GetPoleIDByte(PoleID))
- buff, err := pgfc.EnCode()
- if err != nil {
- return 0, 0, err
- }
- defer PoolPut(buff)
- var recvdata []byte
- recvdata, err = o.SendRecvData(buff.B, 1)
- if err != nil {
- return 0, 0, err
- }
- var pgfcresp zigbee.PackGeneralFuncCommand
- err = pgfcresp.DeCode(recvdata)
- if err != nil {
- return 0, 0, err
- }
- if pgfcresp.Cmd == zigbee.CmdReadBrightness && len(pgfcresp.Data) >= 4 {
- return pgfcresp.Data[2], pgfcresp.Data[3], nil
- } else {
- logrus.Errorf("查询亮度返回的内容错误:%s", hex.EncodeToString(recvdata))
- }
- return 0, 0, errors.New("查询亮度返回错误")
- }
- // SetTime 单灯校时
- func (o *Concentrator) SetTime(PoleID uint32) error {
- t := protocol.BJNow()
- data := make([]byte, 0, 5)
- data = append(data, GetPoleIDByte(PoleID)...)
- data = append(data, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second()))
- var pack zigbee.PackGeneralFuncCommand
- pack.SetData(PoleID, zigbee.CmdSetCorrectiontime, o.nextSeq(), data)
- buff, err := pack.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- _, err = o.SendRecvData(buff.B, 3)
- }
- return err
- }
- // SetPoleAuto 单灯恢复时控
- func (o *Concentrator) SetPoleAuto(PoleID uint32) error {
- var pack zigbee.PackGeneralFuncCommand
- pack.SetData(PoleID, zigbee.CmdSetAuto, o.nextSeq(), GetPoleIDByte(PoleID))
- buff, err := pack.EnCode()
- defer PoolPut(buff)
- if buff != nil {
- _, err = o.SendRecvData(buff.B, 3)
- }
- return err
- }
- func (o *Concentrator) HandleCache(m mqtt.Message) {
- o.downQueue.Put(m)
- }
- func (o *Concentrator) HandleTpChzbSetBroadcasttime(m mqtt.Message) {
- var obj protocol.Pack_CHZB_EmptyObject
- var ret protocol.Pack_Ack
- var err error
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- err = o.BroadcastTime()
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *Concentrator) HandleTpChzbSetWaittime(m mqtt.Message) {
- var obj protocol.Pack_CHZB_Waittime
- var ret protocol.Pack_Ack
- var err error
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Data.Waittime < 1000 || obj.Data.Waittime > 15000 {
- err = errors.New("设置的等待时间不在[1000,15000]范围")
- } else {
- o.devinfo.WaitTime = obj.Data.Waittime
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_WAITTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *Concentrator) HandleTpChzbSetSwitch(m mqtt.Message) {
- var obj protocol.Pack_CHZB_Switch
- var ret protocol.Pack_CHZB_SeqLampAck
- var err error
- mapIpole := make(map[uint32]*protocol.StateError)
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Id != o.devinfo.DevCode {
- return
- }
- Brightness := obj.Data.Brightness
- if obj.Data.Switch == 0 && Brightness > 0 {
- Brightness = 0
- }
- mapRedisTempLampsOOT := make(map[string]interface{}) //临时开关灯记录,用于排除异常亮灯正常亮灯的情况
- ltr := LampTimeRange{
- Start: util.MLTime(util.MlNow()),
- End: util.MLTime(util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second)), //延迟2分钟,以防指令在队列中未及时执行
- Brightness: Brightness,
- }
- ltrstr, _ := json.MarshalToString(ltr)
- if len(obj.Data.LampIDs) == 0 { //广播
- err = o.BroadcastOnOrOff(obj.Data.Switch, Brightness)
- o.broadcastAutoTime = util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second)
- for k := range o.mapLamps {
- mapRedisTempLampsOOT[strconv.Itoa(int(k))] = ltrstr //redis
- o.mapTempLampsOOT[k] = <r //内存
- }
- } else { //指定的灯
- for _, pid := range obj.Data.LampIDs {
- //过滤掉不正常的pid
- if pid == 0 || pid == 0x0000FEFE {
- continue
- }
- err = o.SetBrightness(pid, Brightness, Brightness)
- mapIpole[pid] = protocol.NewStateError(err)
- mapRedisTempLampsOOT[strconv.Itoa(int(pid))] = ltrstr //redis
- o.mapTempLampsOOT[pid] = <r //内存
- }
- }
- if err := redisEdgeData.HSet(LampSwitchPrefix+o.devinfo.DevCode, mapRedisTempLampsOOT).Err(); err != nil {
- logrus.Errorf("手动开关灯时间设置[内容:%v]缓存到redis失败:%s", mapRedisTempLampsOOT, err.Error())
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_SWITCH_ACK), str, mqtt.AtMostOnce, ToAll)
- }
- }
- func (o *Concentrator) HandleTpChzbSetRecoveryAuto(m mqtt.Message) {
- var obj protocol.Pack_CHZB_Switch
- var ret protocol.Pack_CHZB_SeqLampAck
- var err error
- mapIpole := make(map[uint32]*protocol.StateError)
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Id != o.devinfo.DevCode {
- return
- }
- if len(obj.Data.LampIDs) == 0 { //广播
- o.CheckRecoveryAuto(true)
- } else {
- strList := make([]string, 0, len(obj.Data.LampIDs))
- for _, v := range obj.Data.LampIDs {
- err := o.SetPoleAuto(v)
- if err == nil {
- delete(o.mapTempLampsOOT, v)
- strList = append(strList, strconv.Itoa(int(v)))
- } else {
- logrus.Errorf("手动单灯[%d]恢复时控失败:%s", v, err.Error())
- }
- mapIpole[v] = protocol.NewStateError(err)
- }
- if err := redisEdgeData.HDel(LampSwitchPrefix+o.devinfo.DevCode, strList...).Err(); err != nil {
- logrus.Errorf("手动恢复灯控[%v]时控模式,更新redis失败:%s", strList, err.Error())
- }
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK), str, mqtt.AtMostOnce, ToAll)
- }
- }
- func (o *Concentrator) HandleTpChzbSetOnofftime(m mqtt.Message) {
- var obj protocol.Pack_SetOnOffTime
- var ret protocol.Pack_CHZB_SeqLampAck
- var err error
- mapIpole := make(map[uint32]*protocol.StateError)
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Id != o.devinfo.DevCode {
- return
- }
- if len(obj.Data.LampIDs) == 0 || len(obj.Data.OnOffTime) == 0 {
- logrus.Errorf("Handle_TP_CHZB_SET_ONOFFTIME:错误,灯控编号[%v],时间段个数:%v",
- obj.Data.LampIDs, obj.Data.OnOffTime)
- return
- }
- mapRedisOOT := make(map[string]interface{})
- var data []zigbee.OnOffTime
- for _, t := range obj.Data.OnOffTime {
- dat := zigbee.OnOffTime{
- OnHour: t.OnHour,
- OnMinite: t.OnMinite,
- OffHour: t.OffHour,
- OffMinite: t.OffMinite,
- Brightness: t.Brightness,
- }
- data = append(data, dat)
- }
- datastr, _ := json.MarshalToString(data)
- for _, v := range obj.Data.LampIDs {
- if v == 0 || v == 0x0000FEFE { //编号等于0或为广播地址则不处理
- continue
- }
- if err = o.SetOnOffTime(v, zigbee.CmdSetOnofftime, data); err != nil {
- logrus.Errorf("单灯[%d]设置开关灯时间失败:%s", v, err.Error())
- } else {
- logrus.Infof("单灯[%d]设置开关灯时间成功", v)
- }
- mapIpole[v] = protocol.NewStateError(err)
- mapRedisOOT[strconv.Itoa(int(v))] = datastr //缓存到redis
- o.mapLamps2OOT[v] = data //缓存在内存中
- }
- //持久缓存到redis,以便于重启后读取进内存中
- if err := redisEdgeData.HSet(LampOotPrefix+o.devinfo.DevCode, mapRedisOOT).Err(); err != nil {
- logrus.Errorf("灯控时间设置[内容:%v]缓存到redis失败:%s", mapRedisOOT, err.Error())
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *Concentrator) HandleTpChzbQueryOnofftime(m mqtt.Message) {
- var obj protocol.Pack_CHZB_QueryOnOffTime
- var oot []zigbee.OnOffTime
- var err error
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Id != o.devinfo.DevCode {
- return
- }
- if obj.Data.Poleid > 0 {
- oot, err = o.GetOnOffTime(obj.Data.Poleid, zigbee.CmdReadOnofftime)
- }
- var ret protocol.Pack_CHZB_QueryOnOffTimeAck
- var poot []protocol.CHZB_OnOffTime
- for _, v := range oot {
- x := protocol.CHZB_OnOffTime{
- OnHour: v.OnHour,
- OnMinite: v.OnMinite,
- OffHour: v.OffHour,
- OffMinite: v.OffMinite,
- Brightness: v.Brightness,
- }
- poot = append(poot, x)
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, obj.Data.Poleid, err, poot); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *Concentrator) HandleTpChzbSetUpdateLamp(m mqtt.Message) {
- var obj protocol.Pack_CHZB_LampIDs
- var ret protocol.Pack_Ack
- var err error
- if err = obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- if obj.Id != o.devinfo.DevCode {
- return
- }
- if len(obj.Data.MapLamps) > 0 {
- mapLampsTmp := make(map[uint32]string)
- for k, v := range obj.Data.MapLamps {
- //过滤掉不正常的数据
- if k > 0 && v != "" {
- mapLampsTmp[k] = v
- }
- }
- o.mapLamps = mapLampsTmp
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *Concentrator) HandleTpChzbQueryTime(m mqtt.Message) {
- var obj protocol.Pack_CHZB_QueryTime
- var ret protocol.Pack_CHZB_QueryTimeAck
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString())
- return
- }
- var pt *protocol.CHZB_LampTime = nil
- hh, mm, ss, err := o.ReadPoleTime(obj.Data.LampID)
- if err == nil {
- pt = &protocol.CHZB_LampTime{Hour: hh, Minite: mm, Second: ss}
- }
- if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, obj.Data.LampID, err, pt); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devinfo.DevCode, protocol.TP_CHZB_QUERY_TIME_ACK), str, mqtt.AtMostOnce, ToCloud)
- }
- }
|