concentrator.go 30 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/go-redis/redis/v7"
  10. "github.com/valyala/bytebufferpool"
  11. "lc/common/mqtt"
  12. "lc/common/protocol"
  13. "lc/common/util"
  14. "lc/edge/ipole/zigbee"
  15. )
  16. var ConcentratorProtocol = "CHDDJK-Zigbee"
  17. var LampOotPrefix = "lamp_oot_"
  18. var LampSwitchPrefix = "lamp_switch_"
  19. var LampBroadcastPrefix = "lamp_broadcastauto_"
  20. var LampAlarmPrefix = "lamp_alarm_"
  21. func PoolPut(b *bytebufferpool.ByteBuffer) {
  22. if b != nil {
  23. bytebufferpool.Put(b)
  24. }
  25. }
  26. func GetPoleIDByte(PoleID uint32) []byte {
  27. tmp := make([]byte, 4)
  28. binary.BigEndian.PutUint32(tmp, PoleID)
  29. return tmp[2:4]
  30. }
  31. type LampTimeRange struct {
  32. Start util.MLTime `json:"start"`
  33. End util.MLTime `json:"end"`
  34. Brightness uint8 `json:"brightness"` //0熄灯,大于0都是开灯
  35. }
  36. func (o *LampTimeRange) isInTimeRange(t time.Time) bool {
  37. if t.After(time.Time(o.Start)) && t.Before(time.Time(o.End)) {
  38. return true
  39. }
  40. return false
  41. }
  42. type LampAlarmInfo struct {
  43. Alarm *protocol.LampAlarm `json:"alarm"`
  44. Send bool `json:"send"`
  45. }
  46. // Concentrator 集中器管理
  47. type Concentrator struct {
  48. seq uint8
  49. mutexSeq sync.Mutex
  50. devInfo *protocol.DevInfo
  51. model *protocol.IotModel
  52. ctx context.Context
  53. cancel context.CancelFunc
  54. downQueue *util.MlQueue
  55. readQueue *util.MlQueue //读取数据的队列,读完则发送
  56. mapLamps map[uint32]string
  57. mapTopicHandle map[string]func(m mqtt.Message)
  58. mapLamps2OOT map[uint32][]zigbee.OnOffTime //时控策略
  59. mapTempLampsOOT map[uint32]*LampTimeRange //临时手动控时间段,手动控制开关灯
  60. broadcastAutoTime time.Time //广播模式截止时间,过期自动恢复
  61. mapLampAlarm map[string]*LampAlarmInfo //告警数据
  62. chanDevInfo chan *protocol.DevInfo //设备管理更新
  63. chanModelInfo chan *ModelInfo //设备管理更新
  64. }
  65. func NewConcentrator(info *protocol.DevInfo) Device {
  66. ctx, cancel := context.WithCancel(context.Background())
  67. dev := &Concentrator{
  68. mapLamps: make(map[uint32]string),
  69. devInfo: info,
  70. ctx: ctx,
  71. cancel: cancel,
  72. downQueue: util.NewQueue(200),
  73. readQueue: util.NewQueue(200),
  74. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  75. mapLamps2OOT: make(map[uint32][]zigbee.OnOffTime), //时控策略
  76. mapTempLampsOOT: make(map[uint32]*LampTimeRange), //临时时间段,手动控制开关灯
  77. broadcastAutoTime: time.Time{},
  78. mapLampAlarm: make(map[string]*LampAlarmInfo),
  79. chanDevInfo: make(chan *protocol.DevInfo),
  80. chanModelInfo: make(chan *ModelInfo),
  81. }
  82. iot, err := loadModel(info.TID)
  83. if err == nil && iot.TID == info.TID && iot.Protocol == ConcentratorProtocol {
  84. dev.model = iot
  85. }
  86. dev.SetTopicHandle()
  87. return dev
  88. }
  89. func (o *Concentrator) SetTopicHandle() {
  90. o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH)] = o.HandleTpChzbSetSwitch
  91. o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO)] = o.HandleTpChzbSetRecoveryAuto
  92. o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME)] = o.HandleSetOnOffTime
  93. o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP)] = o.HandleSetUpdateLamp
  94. o.mapTopicHandle[GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME)] = o.HandleQueryTime
  95. }
  96. func (o *Concentrator) MQTTSubscribe() {
  97. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //广播校时
  98. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_WAITTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置zigbee集中器收发等待时间
  99. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH), mqtt.ExactlyOnce, o.HandleCache, ToAll) //开关灯,广播开关灯
  100. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO), mqtt.ExactlyOnce, o.HandleCache, ToAll) //广播开关灯
  101. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //设置开关灯时间段
  102. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取开关灯时间段
  103. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //更新灯控末端
  104. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME), mqtt.ExactlyOnce, o.HandleCache, ToCloud) //读取单灯末端时间
  105. }
  106. func (o *Concentrator) Start() {
  107. o.MQTTSubscribe()
  108. retry := 3
  109. sleep := time.Duration(2)
  110. for i := 0; i < retry; i++ {
  111. if err := o.ReloadOOTFromRedis(); err == nil {
  112. break
  113. }
  114. time.Sleep(sleep * time.Second)
  115. }
  116. for i := 0; i < retry; i++ {
  117. if err := o.ReloadSwitchOOTFromRedis(); err == nil {
  118. break
  119. }
  120. time.Sleep(sleep * time.Second)
  121. }
  122. for i := 0; i < retry; i++ {
  123. if err := o.ReloadBroadCastFromRedis(); err == nil {
  124. break
  125. }
  126. time.Sleep(sleep * time.Second)
  127. }
  128. for i := 0; i < retry; i++ {
  129. if err := o.ReloadLampAlarmFromRedis(); err == nil {
  130. break
  131. }
  132. time.Sleep(sleep * time.Second)
  133. }
  134. go o.Handle()
  135. }
  136. func (o *Concentrator) Stop() {
  137. o.cancel()
  138. }
  139. func (o *Concentrator) UpdateInfo(devinfo protocol.DevInfo) {
  140. o.chanDevInfo <- &devinfo
  141. }
  142. func (o *Concentrator) GetDevInfo() *protocol.DevInfo {
  143. return o.devInfo
  144. }
  145. func (o *Concentrator) UpdateModel(tid uint16, flag int) {
  146. if tid > 0 {
  147. mi := ModelInfo{
  148. TID: tid,
  149. Flag: flag,
  150. }
  151. o.chanModelInfo <- &mi
  152. }
  153. }
  154. func (o *Concentrator) UpdateModel2(mi *ModelInfo) {
  155. if o.devInfo.TID != mi.TID {
  156. return
  157. }
  158. if mi.Flag == 0 {
  159. return
  160. }
  161. iot, err := loadModel(mi.TID)
  162. if err != nil {
  163. return
  164. }
  165. if iot.Protocol == ConcentratorProtocol { //合法的物模型
  166. o.model = iot
  167. }
  168. }
  169. func (o *Concentrator) ReloadOOTFromRedis() error {
  170. result, err := redisEdgeData.HGetAll(LampOotPrefix + o.devInfo.DevCode).Result()
  171. if err != nil {
  172. if err == redis.Nil {
  173. return nil
  174. }
  175. return err
  176. }
  177. for k, v := range result {
  178. var oot []zigbee.OnOffTime
  179. lampId, err2 := strconv.Atoi(k)
  180. err3 := json.UnmarshalFromString(v, &oot)
  181. if err2 == nil && err3 == nil {
  182. o.mapLamps2OOT[uint32(lampId)] = oot
  183. }
  184. }
  185. return nil
  186. }
  187. func (o *Concentrator) ReloadSwitchOOTFromRedis() error {
  188. result, err := redisEdgeData.HGetAll(LampSwitchPrefix + o.devInfo.DevCode).Result()
  189. if err != nil {
  190. if err == redis.Nil {
  191. return nil
  192. }
  193. return err
  194. }
  195. for k, v := range result {
  196. var ltr LampTimeRange
  197. lampId, err2 := strconv.Atoi(k)
  198. err3 := json.UnmarshalFromString(v, &ltr)
  199. if err2 == nil && err3 == nil {
  200. o.mapTempLampsOOT[uint32(lampId)] = &ltr
  201. }
  202. }
  203. return nil
  204. }
  205. func (o *Concentrator) ReloadBroadCastFromRedis() error {
  206. strTime, err := redisEdgeData.Get(LampBroadcastPrefix + o.devInfo.DevCode).Result()
  207. if err != nil {
  208. if err == redis.Nil {
  209. return nil
  210. }
  211. return err
  212. }
  213. if t, err := util.MlParseTime(strTime); err == nil {
  214. o.broadcastAutoTime = t
  215. }
  216. return nil
  217. }
  218. func (o *Concentrator) ReloadLampAlarmFromRedis() error {
  219. mapAlarm, err := redisEdgeData.HGetAll(LampAlarmPrefix + o.devInfo.DevCode).Result()
  220. if err != nil {
  221. if err == redis.Nil {
  222. return nil
  223. }
  224. return err
  225. }
  226. for k, v := range mapAlarm {
  227. var lai LampAlarmInfo
  228. if err := json.UnmarshalFromString(v, &lai); err == nil {
  229. o.mapLampAlarm[k] = &lai
  230. }
  231. }
  232. return nil
  233. }
  234. func (o *Concentrator) Handle() {
  235. defer func() {
  236. recover()
  237. go o.Handle()
  238. }()
  239. o.queryPush()
  240. o.BroadcastTime()
  241. exit := false
  242. mapData := make(map[string]*protocol.CHZB_LampData)
  243. LastTime := util.MlNow()
  244. nextFillTime := time.Time{}
  245. for {
  246. select {
  247. case <-o.ctx.Done():
  248. exit = true
  249. case devInfo := <-o.chanDevInfo:
  250. o.devInfo = devInfo
  251. case mi := <-o.chanModelInfo:
  252. o.UpdateModel2(mi)
  253. default:
  254. //从队列钟获取指令执行
  255. if m, ok, _ := o.downQueue.Get(); ok {
  256. if mm, ok := m.(mqtt.Message); ok {
  257. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  258. fn(mm)
  259. }
  260. }
  261. } else {
  262. if exit { //退出前全部恢复时控模式
  263. o.CheckRecoveryAuto(true)
  264. return
  265. }
  266. //每小时同步一次时间
  267. if time.Now().Sub(LastTime).Minutes() > 60 {
  268. o.queryPush() //每小时从服务端同步一次单灯编号
  269. o.BroadcastTime() //每小时广播一次时间同步消息
  270. LastTime = time.Now()
  271. }
  272. o.CheckRecoveryAuto(false)
  273. quantity, send := o.NextQueueRead(mapData)
  274. if quantity == 0 || send {
  275. if nextFillTime.IsZero() || nextFillTime.Before(util.MlNow()) {
  276. o.fillReadQueue()
  277. nextFillTime = util.MlNow().Add(time.Duration(o.devInfo.SendCloud) * time.Millisecond)
  278. }
  279. if send {
  280. mapData = make(map[string]*protocol.CHZB_LampData)
  281. o.UploadLampAlarm()
  282. }
  283. time.Sleep(300 * time.Millisecond)
  284. }
  285. }
  286. }
  287. }
  288. }
  289. // CheckRecoveryAuto force=true 强制全部恢复时控模式
  290. func (o *Concentrator) CheckRecoveryAuto(force bool) {
  291. //存在广播控时,如果广播控过期,则全部恢复时控模式,未过期,则不处理
  292. if (!o.broadcastAutoTime.IsZero() && o.broadcastAutoTime.Before(util.MlNow())) || force {
  293. for k := range o.mapTempLampsOOT {
  294. delete(o.mapTempLampsOOT, k)
  295. }
  296. if err := o.BroadcastAuto(); err == nil {
  297. o.broadcastAutoTime = time.Time{}
  298. //删除redis中所有临时开关灯记录
  299. redisEdgeData.Del(LampSwitchPrefix + o.devInfo.DevCode)
  300. }
  301. } else {
  302. //如果广播控模式未过期,则判断单灯是否手动控过期,过期则恢复
  303. var strList []string
  304. for k, v := range o.mapTempLampsOOT {
  305. if time.Time(v.End).Before(util.MlNow()) {
  306. if err := o.SetPoleAuto(k); err == nil {
  307. strList = append(strList, strconv.Itoa(int(k)))
  308. delete(o.mapTempLampsOOT, k)
  309. }
  310. }
  311. }
  312. if len(strList) > 0 {
  313. _ = redisEdgeData.HDel(LampSwitchPrefix+o.devInfo.DevCode, strList...)
  314. }
  315. }
  316. }
  317. func (o *Concentrator) queryPush() {
  318. var obj protocol.Pack_CHZB_EmptyObject
  319. if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64()); err == nil {
  320. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_LAMP), str, mqtt.AtMostOnce, ToCloud)
  321. }
  322. }
  323. type LampNumberDID struct {
  324. LampID uint32
  325. DID string
  326. }
  327. func (o *Concentrator) fillReadQueue() {
  328. for k, v := range o.mapLamps {
  329. o.readQueue.Put(LampNumberDID{LampID: k, DID: v})
  330. }
  331. }
  332. func (o *Concentrator) NextQueueRead(mapData map[string]*protocol.CHZB_LampData) (uint32, bool) {
  333. val, ok, quantity := o.readQueue.Get()
  334. if ok {
  335. lnd := val.(LampNumberDID)
  336. var err_ error
  337. var data protocol.CHZB_LampData
  338. data.Data = make(map[uint16]float64)
  339. if b1, b2, err := o.GetBrightness(lnd.LampID); err == nil {
  340. data.Data[1] = float64(b1)
  341. data.Data[2] = float64(b2)
  342. //判断灯亮是否正常
  343. o.CheckLampAlarm(lnd, b1, b2)
  344. } else {
  345. err_ = err
  346. }
  347. if e, err := o.ReadElectricalPara(lnd.LampID); err == nil {
  348. data.Data[3] = e.Voltage[0]
  349. data.Data[4] = e.Voltage[1]
  350. data.Data[5] = e.Voltage[2]
  351. data.Data[6] = e.Current[0]
  352. data.Data[7] = e.Current[1]
  353. data.Data[8] = float64(e.Degree[0])
  354. data.Data[9] = float64(e.Degree[1])
  355. } else {
  356. err_ = err
  357. }
  358. data.SetStateErrorData(err_)
  359. mapData[lnd.DID] = &data
  360. }
  361. if quantity == 0 && len(mapData) > 0 {
  362. var obj protocol.Pack_CHZB_UploadData
  363. if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), o.devInfo.TID, mapData); err == nil {
  364. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_DATA), str, mqtt.AtMostOnce, ToCloud)
  365. }
  366. return quantity, true
  367. }
  368. return quantity, false
  369. }
  370. // UploadLampAlarm 告警开始上报和告警结束上报
  371. func (o *Concentrator) UploadLampAlarm() {
  372. var toDelete []string
  373. for k, v := range o.mapLampAlarm {
  374. if !v.Send && v.Alarm.EndTime == "" { //告警开始上报
  375. var obj protocol.Pack_CHZB_LampAlarm
  376. if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil {
  377. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud)
  378. }
  379. v.Send = true
  380. //缓存到redis
  381. strAlarm, _ := json.MarshalToString(v)
  382. mapRedis := make(map[string]interface{})
  383. mapRedis[k] = strAlarm
  384. redisEdgeData.HSet(LampAlarmPrefix+o.devInfo.DevCode, mapRedis)
  385. } else if v.Alarm.EndTime != "" { //告警结束上报
  386. var obj protocol.Pack_CHZB_LampAlarm
  387. if str, err := obj.EnCode(o.devInfo.DevCode, appConfig.GID, GetNextUint64(), v.Alarm); err == nil {
  388. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_ALARM), str, mqtt.AtMostOnce, ToCloud)
  389. }
  390. toDelete = append(toDelete, k)
  391. delete(o.mapLampAlarm, k)
  392. }
  393. }
  394. if len(toDelete) > 0 {
  395. redisEdgeData.HDel(LampAlarmPrefix+o.devInfo.DevCode, toDelete...)
  396. }
  397. }
  398. // CheckLampAlarm 检查开灯、关灯、亮度异常
  399. func (o *Concentrator) CheckLampAlarm(lnd LampNumberDID, b1, b2 uint8) {
  400. //真实数据时间
  401. now := util.MlNow().Add(-time.Duration(o.devInfo.WaitTime) * time.Millisecond)
  402. except := uint16(protocol.LE_UNKNOWN)
  403. //策略时间段检查
  404. if oots, ok := o.mapLamps2OOT[lnd.LampID]; ok {
  405. for _, oot := range oots { //亮灯时间段
  406. if oot.InTimeRange(util.MlNow()) {
  407. if b1 > 0 { //亮灯
  408. if oot.Brightness == b1 {
  409. except = protocol.LE_OK //亮灯正常
  410. } else {
  411. except = protocol.LE_ON_BRIGHTNESS //亮灯异常(亮度异常)
  412. }
  413. } else { //异常熄灯
  414. except = protocol.LE_OFF
  415. }
  416. break
  417. }
  418. }
  419. }
  420. //手动控检查
  421. if switchoot, ok := o.mapTempLampsOOT[lnd.LampID]; ok {
  422. if switchoot.isInTimeRange(now) {
  423. if switchoot.Brightness == b1 {
  424. except = protocol.LE_OK //正常亮灯/熄灯
  425. } else {
  426. if b1 > 0 {
  427. if switchoot.Brightness == 0 {
  428. except = protocol.LE_ON //异常亮灯
  429. } else {
  430. except = protocol.LE_ON_BRIGHTNESS //亮度异常
  431. }
  432. } else {
  433. except = protocol.LE_OFF //异常熄灯
  434. }
  435. }
  436. }
  437. }
  438. if except == protocol.LE_UNKNOWN {
  439. if b1 > 0 {
  440. except = protocol.LE_ON //异常亮灯
  441. } else {
  442. except = protocol.LE_OK
  443. }
  444. }
  445. if a, ok := o.mapLampAlarm[lnd.DID]; ok {
  446. if except == protocol.LE_OK { //告警结束
  447. a.Alarm.EndTime = now.Format("2006-01-02 15:04:05")
  448. a.Alarm.Brightness = b1
  449. }
  450. } else {
  451. if except != protocol.LE_OK { //告警开始
  452. a := protocol.LampAlarm{DID: lnd.DID, AlarmType: except, AlarmBrightness: b1, StartTime: now.Format("2006-01-02 15:04:05")}
  453. lai := LampAlarmInfo{Alarm: &a, Send: false}
  454. o.mapLampAlarm[lnd.DID] = &lai
  455. }
  456. }
  457. }
  458. func (o *Concentrator) nextSeq() uint8 {
  459. o.mutexSeq.Lock()
  460. defer o.mutexSeq.Unlock()
  461. o.seq++
  462. return o.seq
  463. }
  464. func (o *Concentrator) SendRecvData(aduRequest []byte, retry int) (aduResponse []byte, err error) {
  465. serial := GetSerialMgr().GetSerialPort(o.devInfo.Code)
  466. if serial == nil {
  467. return nil, ErrClosedConnection
  468. }
  469. if retry <= 0 {
  470. retry = 1
  471. }
  472. for ; retry > 0; retry-- {
  473. aduResponse, err = serial.SendRecvData(aduRequest, FlagChZigbee, o.devInfo.WaitTime)
  474. if err == nil {
  475. break
  476. }
  477. }
  478. return aduResponse, err
  479. }
  480. func (o *Concentrator) SendData(aduRequest []byte, retry int) (err error) {
  481. serial := GetSerialMgr().GetSerialPort(o.devInfo.Code)
  482. if serial == nil {
  483. return ErrClosedConnection
  484. }
  485. if retry <= 0 {
  486. retry = 1
  487. }
  488. for ; retry > 0; retry-- {
  489. if err = serial.SendData(aduRequest, FlagChZigbee, o.devInfo.WaitTime); err == nil {
  490. break
  491. }
  492. }
  493. return err
  494. }
  495. // BroadcastTime 广播校时
  496. func (o *Concentrator) BroadcastTime() error {
  497. t := protocol.BJNow()
  498. var pack zigbee.PackUpgradeFuncCommand
  499. pack.SetData(0x0000FEFE, zigbee.CmdSetBroadcastCorrectiontime, o.nextSeq(),
  500. []byte{0xFE, 0xFE, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second())})
  501. buff, err := pack.EnCode()
  502. defer PoolPut(buff)
  503. if buff != nil {
  504. err = o.SendData(buff.B, 2)
  505. }
  506. return err
  507. }
  508. // BroadcastOnOrOff 广播开关灯
  509. func (o *Concentrator) BroadcastOnOrOff(on, brightness uint8) error {
  510. var cmd = zigbee.CmdSetBroadcastOn
  511. if on == 0 {
  512. cmd = zigbee.CmdSetBroadcastOff
  513. }
  514. var pack zigbee.PackUpgradeFuncCommand
  515. pack.SetData(0x0000FEFE, cmd, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF, brightness, brightness}) //灯1和2
  516. buff, err := pack.EnCode()
  517. defer PoolPut(buff)
  518. if buff != nil {
  519. err = o.SendData(buff.B, 2)
  520. }
  521. return err
  522. }
  523. // BroadcastAuto 广播恢复时控模式
  524. func (o *Concentrator) BroadcastAuto() error {
  525. var pufc zigbee.PackUpgradeFuncCommand
  526. //pufc.SetData(0x0000fefe, zigbee.CMD_SET_BROADCAST_AUTO, o.nextSeq(), []byte{0xFE, 0xFE, 0xFF})
  527. pufc.SetData(0x0000fefe, zigbee.CmdSetBroadcastAuto, o.nextSeq(), []byte{0xFE, 0xFE, 0x03}) //灯1和2
  528. buff, err := pufc.EnCode()
  529. defer PoolPut(buff)
  530. if buff != nil {
  531. err = o.SendData(buff.B, 3)
  532. }
  533. return err
  534. }
  535. // SetOnOffTime 设置灯1开关灯时间段
  536. func (o *Concentrator) SetOnOffTime(PoleID uint32, Cmd uint8, data []zigbee.OnOffTime) error {
  537. buff0 := bytebufferpool.Get()
  538. defer PoolPut(buff0)
  539. buff0.Write(GetPoleIDByte(PoleID))
  540. length := len(data)
  541. for i := 0; i < 4; i++ {
  542. if i < length {
  543. buff0.Write(data[i].EnCode())
  544. } else {
  545. buff0.Write((&zigbee.OnOffTime{}).EnCode())
  546. }
  547. }
  548. var pgfc zigbee.PackGeneralFuncCommand
  549. pgfc.SetData(PoleID, Cmd, o.nextSeq(), buff0.B)
  550. buff, err := pgfc.EnCode()
  551. defer PoolPut(buff)
  552. if buff != nil {
  553. _, err = o.SendRecvData(buff.B, 3)
  554. }
  555. return err
  556. }
  557. // GetOnOffTime 读取灯1开关灯时间段
  558. func (o *Concentrator) GetOnOffTime(PoleID uint32, Cmd uint8) ([]zigbee.OnOffTime, error) {
  559. var pgfc zigbee.PackGeneralFuncCommand
  560. pgfc.SetData(PoleID, Cmd, o.nextSeq(), GetPoleIDByte(PoleID))
  561. buff, err := pgfc.EnCode()
  562. if err != nil {
  563. return nil, err
  564. }
  565. defer PoolPut(buff)
  566. var recvdata []byte
  567. recvdata, err = o.SendRecvData(buff.B, 1)
  568. if err != nil {
  569. return nil, err
  570. }
  571. var pgfcresp zigbee.PackGeneralFuncCommand
  572. err = pgfcresp.DeCode(recvdata)
  573. if err != nil {
  574. return nil, err
  575. }
  576. if len(pgfcresp.Data) >= 22 {
  577. oot := make([]zigbee.OnOffTime, 4)
  578. oot[0].DeCode(pgfcresp.Data[2:7])
  579. oot[1].DeCode(pgfcresp.Data[7:12])
  580. oot[2].DeCode(pgfcresp.Data[12:17])
  581. oot[3].DeCode(pgfcresp.Data[17:22])
  582. //时分都是0
  583. ret := make([]zigbee.OnOffTime, 0, 4)
  584. for _, v := range oot {
  585. if v.OnHour == v.OffHour && v.OnMinite == v.OffMinite &&
  586. v.OnHour == 0 && v.OnMinite == 0 {
  587. continue
  588. }
  589. ret = append(ret, v)
  590. }
  591. return ret, nil
  592. }
  593. return nil, errors.New("读取开关灯时间返回的内容错误")
  594. }
  595. // ReadPoleTime 读取单灯时间
  596. func (o *Concentrator) ReadPoleTime(PoleID uint32) (uint8, uint8, uint8, error) {
  597. //从4位带分组编号的灯杆编号中取2位灯杆编号
  598. var pgfc zigbee.PackGeneralFuncCommand
  599. pgfc.SetData(PoleID, zigbee.CmdReadTime, o.nextSeq(), GetPoleIDByte(PoleID))
  600. buff, err := pgfc.EnCode()
  601. if err != nil {
  602. return 0, 0, 0, err
  603. }
  604. defer PoolPut(buff)
  605. var recvdata []byte
  606. recvdata, err = o.SendRecvData(buff.B, 1)
  607. if err != nil {
  608. return 0, 0, 0, err
  609. }
  610. var pgfcresp zigbee.PackGeneralFuncCommand
  611. err = pgfcresp.DeCode(recvdata)
  612. if err != nil {
  613. return 0, 0, 0, err
  614. }
  615. if len(pgfcresp.Data) >= 5 {
  616. return pgfcresp.Data[2], pgfcresp.Data[3], pgfcresp.Data[4], nil
  617. }
  618. return 0, 0, 0, errors.New("读取单灯时间返回的内容错误")
  619. }
  620. // ElecPara 读单灯电流电压
  621. type ElecPara struct {
  622. Voltage [3]float64
  623. Current [2]float64
  624. Degree [2]uint16
  625. }
  626. func (o *Concentrator) ReadElectricalPara(PoleID uint32) (*ElecPara, error) {
  627. var pgrc zigbee.PackGeneralFuncCommand
  628. pgrc.SetData(PoleID, zigbee.CmdReadDldy, o.nextSeq(), GetPoleIDByte(PoleID))
  629. buff, err := pgrc.EnCode()
  630. if err != nil {
  631. return nil, err
  632. }
  633. defer PoolPut(buff)
  634. var recvdata []byte
  635. recvdata, err = o.SendRecvData(buff.B, 1)
  636. if err != nil {
  637. return nil, err
  638. }
  639. var pgfcresp zigbee.PackGeneralFuncCommand
  640. err = pgfcresp.DeCode(recvdata)
  641. if err != nil {
  642. return nil, err
  643. }
  644. if pgfcresp.Cmd == zigbee.CmdReadDldy && len(pgfcresp.Data) >= 13 {
  645. var ep ElecPara
  646. ep.Voltage[0] = float64(pgfcresp.Data[2] * 2)
  647. ep.Voltage[1] = float64(pgfcresp.Data[3] * 2)
  648. ep.Voltage[2] = float64(pgfcresp.Data[4] * 2)
  649. ep.Current[0] = float64(pgfcresp.Data[5])*0.1 + float64(pgfcresp.Data[7])*0.001
  650. ep.Current[1] = float64(pgfcresp.Data[6])*0.1 + float64(pgfcresp.Data[8])*0.001
  651. ep.Degree[0] = binary.BigEndian.Uint16(pgfcresp.Data[9:11])
  652. ep.Degree[1] = binary.BigEndian.Uint16(pgfcresp.Data[11:13])
  653. return &ep, nil
  654. }
  655. return nil, errors.New("读取电流电压电度返回错误")
  656. }
  657. // SetBrightness 设单灯1,2亮度值
  658. func (o *Concentrator) SetBrightness(PoleID uint32, brightness1 uint8, brightness2 uint8) error {
  659. data := make([]byte, 0, 7)
  660. data = append(data, GetPoleIDByte(PoleID)...)
  661. data = append(data, brightness1, brightness2, 0xFF, 0xFF) //灯1,灯2亮度,后边灯3灯4保留
  662. var pgfc zigbee.PackGeneralFuncCommand
  663. pgfc.SetData(PoleID, zigbee.CmdSetBrightness, o.nextSeq(), data)
  664. buff, err := pgfc.EnCode()
  665. defer PoolPut(buff)
  666. if buff != nil {
  667. _, err = o.SendRecvData(buff.B, 1)
  668. }
  669. return err
  670. }
  671. // GetBrightness 查询单灯1,2亮度值
  672. func (o *Concentrator) GetBrightness(PoleID uint32) (uint8, uint8, error) {
  673. var pgfc zigbee.PackGeneralFuncCommand
  674. pgfc.SetData(PoleID, zigbee.CmdReadBrightness, o.nextSeq(), GetPoleIDByte(PoleID))
  675. buff, err := pgfc.EnCode()
  676. if err != nil {
  677. return 0, 0, err
  678. }
  679. defer PoolPut(buff)
  680. var recvdata []byte
  681. recvdata, err = o.SendRecvData(buff.B, 1)
  682. if err != nil {
  683. return 0, 0, err
  684. }
  685. var pgfcresp zigbee.PackGeneralFuncCommand
  686. err = pgfcresp.DeCode(recvdata)
  687. if err != nil {
  688. return 0, 0, err
  689. }
  690. if pgfcresp.Cmd == zigbee.CmdReadBrightness && len(pgfcresp.Data) >= 4 {
  691. return pgfcresp.Data[2], pgfcresp.Data[3], nil
  692. }
  693. return 0, 0, errors.New("查询亮度返回错误")
  694. }
  695. // SetTime 单灯校时
  696. func (o *Concentrator) SetTime(PoleID uint32) error {
  697. t := protocol.BJNow()
  698. data := make([]byte, 0, 5)
  699. data = append(data, GetPoleIDByte(PoleID)...)
  700. data = append(data, uint8(t.Hour()), uint8(t.Minute()), uint8(t.Second()))
  701. var pack zigbee.PackGeneralFuncCommand
  702. pack.SetData(PoleID, zigbee.CmdSetCorrectiontime, o.nextSeq(), data)
  703. buff, err := pack.EnCode()
  704. defer PoolPut(buff)
  705. if buff != nil {
  706. _, err = o.SendRecvData(buff.B, 3)
  707. }
  708. return err
  709. }
  710. // SetPoleAuto 单灯恢复时控
  711. func (o *Concentrator) SetPoleAuto(PoleID uint32) error {
  712. var pack zigbee.PackGeneralFuncCommand
  713. pack.SetData(PoleID, zigbee.CmdSetAuto, o.nextSeq(), GetPoleIDByte(PoleID))
  714. buff, err := pack.EnCode()
  715. defer PoolPut(buff)
  716. if buff != nil {
  717. _, err = o.SendRecvData(buff.B, 3)
  718. }
  719. return err
  720. }
  721. func (o *Concentrator) HandleCache(m mqtt.Message) {
  722. o.downQueue.Put(m)
  723. }
  724. func (o *Concentrator) HandleTpChzbSetBroadcasttime(m mqtt.Message) {
  725. var obj protocol.Pack_CHZB_EmptyObject
  726. var ret protocol.Pack_Ack
  727. var err error
  728. if err = obj.DeCode(m.PayloadString()); err != nil {
  729. return
  730. }
  731. err = o.BroadcastTime()
  732. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
  733. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_BROADCASTTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
  734. }
  735. }
  736. func (o *Concentrator) HandleTpChzbSetWaittime(m mqtt.Message) {
  737. var obj protocol.Pack_CHZB_Waittime
  738. var ret protocol.Pack_Ack
  739. var err error
  740. if err = obj.DeCode(m.PayloadString()); err != nil {
  741. return
  742. }
  743. if obj.Data.Waittime < 1000 || obj.Data.Waittime > 15000 {
  744. err = errors.New("设置的等待时间不在[1000,15000]范围")
  745. } else {
  746. o.devInfo.WaitTime = obj.Data.Waittime
  747. }
  748. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
  749. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_WAITTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
  750. }
  751. }
  752. func (o *Concentrator) HandleTpChzbSetSwitch(m mqtt.Message) {
  753. var obj protocol.Pack_CHZB_Switch
  754. var ret protocol.Pack_CHZB_SeqLampAck
  755. var err error
  756. mapIpole := make(map[uint32]*protocol.StateError)
  757. if err = obj.DeCode(m.PayloadString()); err != nil {
  758. return
  759. }
  760. if obj.Id != o.devInfo.DevCode {
  761. return
  762. }
  763. Brightness := obj.Data.Brightness
  764. if obj.Data.Switch == 0 && Brightness > 0 {
  765. Brightness = 0
  766. }
  767. mapRedisTempLampsOOT := make(map[string]interface{}) //临时开关灯记录,用于排除异常亮灯正常亮灯的情况
  768. ltr := LampTimeRange{
  769. Start: util.MLTime(util.MlNow()),
  770. End: util.MLTime(util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second)), //延迟2分钟,以防指令在队列中未及时执行
  771. Brightness: Brightness,
  772. }
  773. str, _ := json.MarshalToString(ltr)
  774. if len(obj.Data.LampIDs) == 0 { //广播
  775. err = o.BroadcastOnOrOff(obj.Data.Switch, Brightness)
  776. o.broadcastAutoTime = util.MlNow().Add(time.Duration(obj.Data.Recovery) * time.Second)
  777. for k := range o.mapLamps {
  778. mapRedisTempLampsOOT[strconv.Itoa(int(k))] = str //redis
  779. o.mapTempLampsOOT[k] = &ltr //内存
  780. }
  781. } else { //指定的灯
  782. for _, pid := range obj.Data.LampIDs {
  783. //过滤掉不正常的pid
  784. if pid == 0 || pid == 0x0000FEFE {
  785. continue
  786. }
  787. err = o.SetBrightness(pid, Brightness, Brightness)
  788. mapIpole[pid] = protocol.NewStateError(err)
  789. mapRedisTempLampsOOT[strconv.Itoa(int(pid))] = str //redis
  790. o.mapTempLampsOOT[pid] = &ltr //内存
  791. }
  792. }
  793. redisEdgeData.HSet(LampSwitchPrefix+o.devInfo.DevCode, mapRedisTempLampsOOT)
  794. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
  795. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_SWITCH_ACK), str, mqtt.AtMostOnce, ToAll)
  796. }
  797. }
  798. func (o *Concentrator) HandleTpChzbSetRecoveryAuto(m mqtt.Message) {
  799. var obj protocol.Pack_CHZB_Switch
  800. var ret protocol.Pack_CHZB_SeqLampAck
  801. var err error
  802. mapIpole := make(map[uint32]*protocol.StateError)
  803. if err = obj.DeCode(m.PayloadString()); err != nil {
  804. return
  805. }
  806. if obj.Id != o.devInfo.DevCode {
  807. return
  808. }
  809. if len(obj.Data.LampIDs) == 0 { //广播
  810. o.CheckRecoveryAuto(true)
  811. } else {
  812. strList := make([]string, 0, len(obj.Data.LampIDs))
  813. for _, v := range obj.Data.LampIDs {
  814. err := o.SetPoleAuto(v)
  815. if err == nil {
  816. delete(o.mapTempLampsOOT, v)
  817. strList = append(strList, strconv.Itoa(int(v)))
  818. }
  819. mapIpole[v] = protocol.NewStateError(err)
  820. }
  821. redisEdgeData.HDel(LampSwitchPrefix+o.devInfo.DevCode, strList...)
  822. }
  823. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
  824. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK), str, mqtt.AtMostOnce, ToAll)
  825. }
  826. }
  827. func (o *Concentrator) HandleSetOnOffTime(m mqtt.Message) {
  828. var obj protocol.Pack_SetOnOffTime
  829. var ret protocol.Pack_CHZB_SeqLampAck
  830. var err error
  831. mapIpole := make(map[uint32]*protocol.StateError)
  832. if err = obj.DeCode(m.PayloadString()); err != nil {
  833. return
  834. }
  835. if obj.Id != o.devInfo.DevCode {
  836. return
  837. }
  838. if len(obj.Data.LampIDs) == 0 || len(obj.Data.OnOffTime) == 0 {
  839. return
  840. }
  841. mapRedisOOT := make(map[string]interface{})
  842. var data []zigbee.OnOffTime
  843. for _, t := range obj.Data.OnOffTime {
  844. dat := zigbee.OnOffTime{
  845. OnHour: t.OnHour,
  846. OnMinite: t.OnMinite,
  847. OffHour: t.OffHour,
  848. OffMinite: t.OffMinite,
  849. Brightness: t.Brightness,
  850. }
  851. data = append(data, dat)
  852. }
  853. dataStr, _ := json.MarshalToString(data)
  854. for _, v := range obj.Data.LampIDs {
  855. if v == 0 || v == 0x0000FEFE { //编号等于0或为广播地址则不处理
  856. continue
  857. }
  858. o.SetOnOffTime(v, zigbee.CmdSetOnofftime, data)
  859. mapIpole[v] = protocol.NewStateError(err)
  860. mapRedisOOT[strconv.Itoa(int(v))] = dataStr //缓存到redis
  861. o.mapLamps2OOT[v] = data //缓存在内存中
  862. }
  863. //持久缓存到redis,以便于重启后读取进内存中
  864. redisEdgeData.HSet(LampOotPrefix+o.devInfo.DevCode, mapRedisOOT)
  865. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, mapIpole); err == nil {
  866. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
  867. }
  868. }
  869. func (o *Concentrator) HandleQueryOnOffTime(m mqtt.Message) {
  870. var obj protocol.Pack_CHZB_QueryOnOffTime
  871. var oot []zigbee.OnOffTime
  872. var err error
  873. if err = obj.DeCode(m.PayloadString()); err != nil {
  874. return
  875. }
  876. if obj.Id != o.devInfo.DevCode {
  877. return
  878. }
  879. if obj.Data.Poleid > 0 {
  880. oot, err = o.GetOnOffTime(obj.Data.Poleid, zigbee.CmdReadOnofftime)
  881. }
  882. var ret protocol.Pack_CHZB_QueryOnOffTimeAck
  883. var offTimes []protocol.CHZB_OnOffTime
  884. for _, v := range oot {
  885. x := protocol.CHZB_OnOffTime{
  886. OnHour: v.OnHour,
  887. OnMinite: v.OnMinite,
  888. OffHour: v.OffHour,
  889. OffMinite: v.OffMinite,
  890. Brightness: v.Brightness,
  891. }
  892. offTimes = append(offTimes, x)
  893. }
  894. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, obj.Data.Poleid, err, offTimes); err == nil {
  895. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK), str, mqtt.AtMostOnce, ToCloud)
  896. }
  897. }
  898. func (o *Concentrator) HandleSetUpdateLamp(m mqtt.Message) {
  899. var obj protocol.Pack_CHZB_LampIDs
  900. var ret protocol.Pack_Ack
  901. var err error
  902. if err = obj.DeCode(m.PayloadString()); err != nil {
  903. return
  904. }
  905. if obj.Id != o.devInfo.DevCode {
  906. return
  907. }
  908. if len(obj.Data.MapLamps) > 0 {
  909. mapLampsTmp := make(map[uint32]string)
  910. for k, v := range obj.Data.MapLamps {
  911. //过滤掉不正常的数据
  912. if k > 0 && v != "" {
  913. mapLampsTmp[k] = v
  914. }
  915. }
  916. o.mapLamps = mapLampsTmp
  917. }
  918. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, err); err == nil {
  919. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK), str, mqtt.AtMostOnce, ToCloud)
  920. }
  921. }
  922. func (o *Concentrator) HandleQueryTime(m mqtt.Message) {
  923. var obj protocol.Pack_CHZB_QueryTime
  924. var ret protocol.Pack_CHZB_QueryTimeAck
  925. if err := obj.DeCode(m.PayloadString()); err != nil {
  926. return
  927. }
  928. var pt *protocol.CHZB_LampTime = nil
  929. hh, mm, ss, err := o.ReadPoleTime(obj.Data.LampID)
  930. if err == nil {
  931. pt = &protocol.CHZB_LampTime{Hour: hh, Minite: mm, Second: ss}
  932. }
  933. if str, err := ret.EnCode(o.devInfo.DevCode, appConfig.GID, obj.Seq, obj.Data.LampID, err, pt); err == nil {
  934. GetMQTTMgr().Publish(GetTopic(protocol.DT_CONCENTRATOR, o.devInfo.DevCode, protocol.TP_CHZB_QUERY_TIME_ACK), str, mqtt.AtMostOnce, ToCloud)
  935. }
  936. }