package main import ( "context" "errors" "runtime/debug" "time" "github.com/go-redis/redis/v7" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" "lc/edge/ipole/ym485" "strings" "sync" ) var YmProtocol = "yuming485" var mutex sync.Mutex var oldstate uint8 type YmLampController struct { devinfo *protocol.DevInfo model *protocol.IotModel downQueue *util.MlQueue mapTopicHandle map[string]func(m mqtt.Message) mapLamps2OOT []protocol.CHZB_OnOffTime //时控策略 mapTempLampsOOT *LampTimeRange //临时手动控时间段,手动控制开关灯 mapLampAlarm *LampAlarmInfo //告警数据 State []ym485.DeviceState //当前灯控状态 开关灯,电流,电压,电度 ctx context.Context cancel context.CancelFunc chanDevInfo chan *protocol.DevInfo //设备管理更新 chanModelInfo chan *ModelInfo //设备管理更新 tswitch time.Time //手动控执行时间 } func NewYmLampController(info *protocol.DevInfo) Device { ctx, cancel := context.WithCancel(context.Background()) dev := &YmLampController{ devinfo: info, downQueue: util.NewQueue(200), mapTopicHandle: make(map[string]func(m mqtt.Message)), ctx: ctx, cancel: cancel, chanDevInfo: make(chan *protocol.DevInfo), chanModelInfo: make(chan *ModelInfo), } iot, err := loadModel(info.TID) if err != nil { logrus.Errorf("NewYmLampController:加载模型[tid=%d]文件发生错误:%s", info.TID, err.Error()) } else { if iot.Protocol == YmProtocol { dev.model = iot } else { logrus.Error("NewYmLampController:物模型错误,非YmProtocol协议") } } dev.SetTopicHandle() return dev } func (o *YmLampController) SetTopicHandle() { o.mapTopicHandle[GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_SWITCH)] = o.HandleTpYmSetSwitch o.mapTopicHandle[GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_ONOFFTIME)] = o.HandleTpYmSetOnofftime } func (o *YmLampController) MQTTSubscribe() { GetMQTTMgr().Subscribe(GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_SWITCH), mqtt.ExactlyOnce, o.HandleCache, ToAll) //开关灯 GetMQTTMgr().Subscribe(GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置时间 } func (o *YmLampController) HandleCache(m mqtt.Message) { logrus.Infof("Topic:%v,message:%v\n", m.Topic(), m.PayloadString()) o.downQueue.Put(m) } func (o *YmLampController) Start() { o.MQTTSubscribe() retry := 3 sleep := time.Duration(2) for i := 0; i < retry; i++ { if err := o.ReloadOOTFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } for i := 0; i < retry; i++ { if err := o.ReloadSwitchOOTFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } for i := 0; i < retry; i++ { if err := o.ReloadLampAlarmFromRedis(); err == nil { break } time.Sleep(sleep * time.Second) } go o.HandleData() } func (o *YmLampController) Stop() { //停止采集和处理 o.cancel() } func (o *YmLampController) UpdateInfo(devinfo protocol.DevInfo) { o.chanDevInfo <- &devinfo } func (o *YmLampController) GetDevInfo() *protocol.DevInfo { return o.devinfo } func (o *YmLampController) UpdateModel(tid uint16, flag int) { if tid > 0 { mi := ModelInfo{ TID: tid, Flag: flag, } o.chanModelInfo <- &mi } } func (o *YmLampController) UpdateModel2(mi *ModelInfo) { if o.devinfo.TID != mi.TID { return } if mi.Flag == 0 { logrus.Errorf("设备[%s]的物模型[tid=%d]模型文件被删除,下次启动即将生效。", o.devinfo.DevCode, mi.TID) return } logrus.Debugf("YmLampController.UpdateModel2:更新设备[%s]的物模型[%d]", o.devinfo.DevCode, mi.TID) iot, err := loadModel(mi.TID) if err != nil { logrus.Errorf("YmLampController.UpdateModel2:加载模型[%d]文件错误:%s", mi.TID, err.Error()) return } if iot.Protocol == ModbusRtuProtocol { //合法的物模型 o.model = iot logrus.Infof("YmLampController.UpdateModel2:更新设备[%s]的物模型[%d]成功", o.devinfo.DevCode, mi.TID) } else { logrus.Error("YmLampController.UpdateModel2:物模型错误,TID和文件名tid不一致或协议非ModbusRTU协议") } } func (o *YmLampController) GetDevType() string { if o.devinfo.DevType == 3 { return protocol.DT_LAMPCONTROLLER } return "unknown" } // HandleData 数据处理协程 func (o *YmLampController) HandleData() { defer func() { if err := recover(); err != nil { logrus.Error("YmLampController.HandleData:panic:", err) logrus.Error("stack:", string(debug.Stack())) go o.HandleData() } }() o.QueryLampControllerAddr() //线上挂了两个灯控,则不能 logrus.Infof("lamp state:%d", oldstate) nextFillTime := time.Time{} for { select { case <-o.ctx.Done(): logrus.Errorf("设备[%s]的HandleData退出,原因:%v", o.devinfo.DevCode, o.ctx.Err()) return case devinfo_ := <-o.chanDevInfo: o.devinfo = devinfo_ case mi := <-o.chanModelInfo: o.UpdateModel2(mi) default: //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } else { logrus.Errorf("YmLampController.Handle:不支持的主题:%s", mm.Topic()) } } } else { if nextFillTime.IsZero() || nextFillTime.Before(util.MlNow()) { //定时读取和发送 t := util.MlNow() o.QueryDeviceState() //获取灯的数据 o.ConfirmState(t) //获取设置灯的开/关 nextFillTime = util.MlNow().Add(time.Duration(o.devinfo.SendCloud) * time.Millisecond) } time.Sleep(time.Millisecond * 1000) } } } } func (o *YmLampController) SendRecvData(aduRequest []byte, retry int) (aduResponse []byte, err error) { defer mutex.Unlock() mutex.Lock() // time.Sleep(time.Millisecond * 10) serial := GetSerialMgr().GetSerialPort(o.devinfo.Code) if serial == nil { return nil, ErrClosedConnection } if retry <= 0 { retry = 1 } for ; retry > 0; retry-- { aduResponse, err = serial.SendRecvData(aduRequest, FlagYm485, o.devinfo.WaitTime) if err == nil { break } } return aduResponse, err } func (o *YmLampController) SendData(aduRequest []byte, retry int) (err error) { serial := GetSerialMgr().GetSerialPort(o.devinfo.Code) if serial == nil { return ErrClosedConnection } if retry <= 0 { retry = 1 } for ; retry > 0; retry-- { if err = serial.SendData(aduRequest, FlagYm485, o.devinfo.WaitTime); err == nil { break } } return err } func (o *YmLampController) QueryLampControllerAddr() { var obj ym485.QueryAddr buf, err := obj.EnCode() defer ym485.ReleaseByteBuffer(buf) if err != nil { return } recvbuf, err := o.SendRecvData(buf.Bytes(), 2) if err != nil { return } var ret ym485.QueryAddrACK if err = ret.DeCode(recvbuf); err == nil { logrus.Infof("灯控地址:%s", ret.Addr) } if ret.Addr != o.devinfo.DevCode { logrus.Errorf("请将DevCode配置为灯控地址!") } } func (o *YmLampController) QuerySoftVer() { var obj ym485.PackingMark buf, err := obj.EnCode(o.devinfo.DevCode, 0x1E, [4]byte{0x00, 0x00, 0x60, 0x12}) defer ym485.ReleaseByteBuffer(buf) if err != nil { return } recvbuf, err := o.SendRecvData(buf.Bytes(), 1) if err != nil { return } var ret ym485.QuerySoftVerAck if err = ret.DeCode(recvbuf); err == nil { logrus.Infof("灯控型号:%s,软件版本:%s", ret.Model, ret.Version) } } func (o *YmLampController) QueryBasicsettings() { var obj ym485.PackingMark buf, err := obj.EnCode(o.devinfo.DevCode, 0x1E, [4]byte{0x00, 0x00, 0x60, 0x02}) defer ym485.ReleaseByteBuffer(buf) if err != nil { return } recvbuf, err := o.SendRecvData(buf.Bytes(), 1) if err != nil { return } var ret ym485.PackingBasic if err = ret.DeCode(recvbuf); err == nil { logrus.Infof("灯控使能状态:%d,是否上电开灯:%d", ret.Enabled, ret.OnOff) } } // QueryDeviceState 查询设备状态 func (o *YmLampController) QueryDeviceState() { var obj ym485.DataPack //串口协议 buf, err := obj.EnCode(o.devinfo.DevCode, 0x1E, []byte{0x00, 0x00, 0x60, 0x13, 0x01}) defer ym485.ReleaseByteBuffer(buf) if err != nil { return } recvbuf, err := o.SendRecvData(buf.Bytes(), 1) //根据地址得到数据 if err != nil { logrus.Error("单灯状态数据查询错误") return } var ret ym485.QueryDeviceStateAck if err = ret.DeCode(recvbuf); err == nil { //解码串口返回的数据 // if ret.Dp.Addr != o.devinfo.DevCode { //不是本灯数据 if !strings.Contains(o.devinfo.DevCode, ret.Dp.Addr) { //不是本灯数据 o.State = nil return } if len(ret.State) == 0 { o.State = nil return } if len(o.State) != len(ret.State) { o.State = make([]ym485.DeviceState, len(ret.State)) } copy(o.State, ret.State) //状态改变记录日志 if oldstate != o.State[0].State { oldstate = o.State[0].State logrus.Infof("lamp state:%d", oldstate) } } mapData := make(map[string]*protocol.CHZB_LampData) var data protocol.CHZB_LampData data.Data = make(map[uint16]float64) data.SetStateErrorData(err) if len(o.State) > 0 { //只要第一路 if o.State[0].State == 1 { //开灯 data.Data[1] = float64(o.State[0].BrightnessState) } else { data.Data[1] = 0.0 } data.Data[2] = float64(0) data.Data[3] = o.State[0].Voltage data.Data[4] = 0.0 data.Data[5] = 0.0 data.Data[6] = o.State[0].Current data.Data[7] = 0.0 data.Data[8] = o.State[0].Degree //by hxz 1,2,3,6,8为有效 data.Data[9] = 0.0 } mapData[o.devinfo.DevCode] = &data var ret1 protocol.Pack_CHZB_UploadData if str, err := ret1.EnCode(o.devinfo.DevCode, appConfig.GID, GetNextUint64(), o.devinfo.TID, mapData); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_DATA), str, mqtt.AtMostOnce, ToCloud) } } func (o *YmLampController) TurnOnOff(flag uint8) error { kenabled := uint8(0x00) if flag > 0 { kenabled = 0xff } var obj ym485.PackingTurnOnOff buf, err := obj.EnCode(o.devinfo.DevCode, [4]byte{0x00, 0x00, 0x60, 0x07}, 0xff, kenabled) defer ym485.ReleaseByteBuffer(buf) if err != nil { return err } recvbuf, err := o.SendRecvData(buf.Bytes(), 3) if err != nil { return err } var ret ym485.PackingResult err = ret.DeCode(recvbuf) if err == nil { if ret.Result == 89 { //字母'Y' return nil } } return errors.New(protocol.FAILED_STR) } func (o *YmLampController) SetBrightness(brightness uint8) error { var obj ym485.PackingBrightnessColor buf, err := obj.EnCode(o.devinfo.DevCode, [4]byte{0x00, 0x00, 0x60, 0x09}, 0xff, []byte{brightness}) defer ym485.ReleaseByteBuffer(buf) if err != nil { return err } recvbuf, err := o.SendRecvData(buf.Bytes(), 1) if err != nil { return err } var ret ym485.PackingResult err = ret.DeCode(recvbuf) if err == nil { if ret.Result == 89 { //字母'Y' return nil } } return errors.New(protocol.FAILED_STR) } func (o *YmLampController) SetBasicsettings(enabled, on uint8) { var obj ym485.PackingBasic buf, err := obj.EnCode(o.devinfo.DevCode, 0x1E, [4]byte{0x00, 0x00, 0x60, 0x01}, enabled, on, nil) defer ym485.ReleaseByteBuffer(buf) if err != nil { return } recvbuf, err := o.SendRecvData(buf.Bytes(), 1) if err != nil { return } var ret ym485.PackingResult if err = ret.DeCode(recvbuf); err == nil { if ret.Result == 89 { //字母'Y' logrus.Infoln("设置灯控基本信息执行成功") } else { logrus.Errorln("设置灯控基本信息执行失败") } } } func (o *YmLampController) Switch(Switch, Brightness uint8) error { if Switch == 0 { return o.TurnOnOff(0) } else { err := o.SetBrightness(Brightness) if err != nil { logrus.Errorf("调节亮度失败:%s", err.Error()) } return o.TurnOnOff(Switch) } } func (o *YmLampController) ConfirmState(t time.Time) { //刚开/关灯,则不判断,避免采集数据误差导致误判 if !o.tswitch.IsZero() { if t.After(o.tswitch) && t.Sub(o.tswitch).Seconds() < float64(o.devinfo.SendCloud/1000.0) { return } if t.Before(o.tswitch) && o.tswitch.Sub(t).Seconds() < float64(o.devinfo.SendCloud/1000.0) { return } } if len(o.State) == 0 { return } s := o.State[0] if o.mapTempLampsOOT != nil { //优先手动控制 if t.After(time.Time(o.mapTempLampsOOT.End)) { //过时,则清除 if err := redisEdgeData.Del(LampSwitchPrefix + o.devinfo.DevCode).Err(); err == nil { o.mapTempLampsOOT = nil o.tswitch = time.Time{} } } else if o.mapTempLampsOOT.isInTimeRange(t) { if o.mapTempLampsOOT.Brightness > 0 && s.State == 0 { //应开未开 o.Switch(1, o.mapTempLampsOOT.Brightness) //开灯 } else if o.mapTempLampsOOT.Brightness == 0 && s.State == 1 { //应关闭未关 o.Switch(0, o.mapTempLampsOOT.Brightness) //关灯 } return } } isInRange := false for _, v := range o.mapLamps2OOT { //其次时间策略 if v.InTimeRange(t) { //在开灯区间 if v.Brightness > 0 && s.State == 0 { //应开未开 o.Switch(1, v.Brightness) //开灯 } else if v.Brightness == 0 && s.State == 1 { //应关闭未关 o.Switch(0, v.Brightness) //关灯 } isInRange = true break } } if isInRange { return } if s.State == 1 { o.Switch(0, 0) } } func (o *YmLampController) HandleTpYmSetSwitch(m mqtt.Message) { var obj protocol.Pack_CHZB_Switch if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString()) return } if obj.Id != o.devinfo.DevCode { return } brightness := uint8(0) if obj.Data.Switch == 1 { brightness = obj.Data.Brightness } err := o.Switch(obj.Data.Switch, obj.Data.Brightness) if err == nil { o.tswitch = util.MlNow() mapRedisTempLampsOOT := make(map[string]interface{}) //临时开关灯记录,用于排除异常亮灯正常亮灯的情况 ltr := LampTimeRange{ Start: util.MLTime(o.tswitch), End: util.MLTime(o.tswitch.Add(time.Duration(obj.Data.Recovery) * time.Second)), //延迟2分钟,以防指令在队列中未及时执行 Brightness: brightness, } ltrstr, _ := json.MarshalToString(ltr) o.mapTempLampsOOT = <r //内存 mapRedisTempLampsOOT[o.devinfo.DevCode] = ltrstr //redis if err := redisEdgeData.HSet(LampSwitchPrefix+o.devinfo.DevCode, mapRedisTempLampsOOT).Err(); err != nil { logrus.Errorf("手动开关灯时间设置[内容:%v]缓存到redis失败:%s", mapRedisTempLampsOOT, err.Error()) } } var ret protocol.Pack_Ack if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_SWITCH_ACK), str, mqtt.AtMostOnce, ToAll) } } func (o *YmLampController) HandleTpYmSetOnofftime(m mqtt.Message) { var obj protocol.Pack_SetOnOffTime if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("协议解析错误:%s,协议主题:%s,协议内容:%s", err.Error(), m.Topic(), m.PayloadString()) return } if obj.Id != o.devinfo.DevCode { return } if len(obj.Data.OnOffTime) == 0 { logrus.Errorf("Handle_TP_YM_SET_ONOFFTIME:错误,灯控编号[%v],时间段个数:%v", obj.Id, obj.Data.OnOffTime) return } mapRedisOOT := make(map[string]interface{}) datastr, _ := json.MarshalToString(obj.Data.OnOffTime) mapRedisOOT[obj.Id] = datastr //缓存到redis o.mapLamps2OOT = obj.Data.OnOffTime //缓存在内存中 //持久缓存到redis,以便于重启后读取进内存中 err := redisEdgeData.HSet(LampOotPrefix+o.devinfo.DevCode, mapRedisOOT).Err() if err != nil { logrus.Errorf("灯控时间设置[内容:%v]缓存到redis失败:%s", mapRedisOOT, err.Error()) } var ret protocol.Pack_Ack if str, err := ret.EnCode(o.devinfo.DevCode, appConfig.GID, obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_LAMPCONTROLLER, o.devinfo.DevCode, protocol.TP_YM_SET_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud) } } func (o *YmLampController) ReloadOOTFromRedis() error { mapdata, err := redisEdgeData.HGetAll(LampOotPrefix + o.devinfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } logrus.Errorf("YmLampController.ReloadOOTFromRedis设备[%s]从redis加载时间策略失败:%s", o.devinfo.DevCode, err.Error()) return err } for k, v := range mapdata { if k == o.devinfo.DevCode { var oot []protocol.CHZB_OnOffTime if err := json.UnmarshalFromString(v, &oot); err == nil { o.mapLamps2OOT = oot } break } } return nil } func (o *YmLampController) ReloadSwitchOOTFromRedis() error { mapdata, err := redisEdgeData.HGetAll(LampSwitchPrefix + o.devinfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } logrus.Errorf("YmLampController.ReloadSwitchOOTFromRedis设备[%s]从redis加载时间策略失败:%s", o.devinfo.DevCode, err.Error()) return err } for k, v := range mapdata { if k == o.devinfo.DevCode { var ltr LampTimeRange if err := json.UnmarshalFromString(v, <r); err == nil { o.mapTempLampsOOT = <r } break } } return nil } func (o *YmLampController) ReloadLampAlarmFromRedis() error { mapAlarm, err := redisEdgeData.HGetAll(LampAlarmPrefix + o.devinfo.DevCode).Result() if err != nil { if err == redis.Nil { return nil } logrus.Errorf("YmLampController.ReloadLampAlarmFromRedis设备[%s]从redis加载广播恢复截止时间失败:%s", o.devinfo.DevCode, err.Error()) return err } for k, v := range mapAlarm { if k == o.devinfo.DevCode { var lai LampAlarmInfo if err := json.UnmarshalFromString(v, &lai); err == nil { o.mapLampAlarm = &lai } break } } return nil }