hlzbconcentrator.go 17 KB


  1. package main
  2. import (
  3. "context"
  4. "runtime/debug"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/sirupsen/logrus"
  10. "lc/common/models"
  11. "lc/common/mqtt"
  12. "lc/common/protocol"
  13. "lc/common/util"
  14. )
  15. //TODO 每次调光记录入库确认,灯的实际功率显示
  16. // HlZigbeeConcentrator TODO 开灯自动要关灯
  17. // 集控器类定义
  18. type HlZigbeeConcentrator struct {
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. lock sync.Mutex
  22. tenant string //租户
  23. gid string //基础数据,网关ID
  24. did string //基础数据,集控器设备编码
  25. tid uint16 //基础数据,集控器物模型ID
  26. mapLamp map[string]*HlZigbeeLampController //单灯数据,编号唯一
  27. state uint8 //实时数据,0在线,1离线
  28. lastStateTime time.Time //实时数据,最新状态时间
  29. lastDataTime time.Time //最新数据时间
  30. nextHourTime time.Time //实时数据
  31. queue *util.MlQueue //数据队列
  32. errCnt uint //错误计数
  33. mapTopicHandle map[string]func(m *mqtt.Message)
  34. }
  35. // NewHlZigbeeConcentrator 新建集控器,并指定topic对应事件
  36. func NewHlZigbeeConcentrator(tenant, did string) *HlZigbeeConcentrator {
  37. ctx, cancel := context.WithCancel(context.Background())
  38. hlc := HlZigbeeConcentrator{
  39. ctx: ctx,
  40. cancel: cancel,
  41. tenant: tenant,
  42. did: did,
  43. mapLamp: make(map[string]*HlZigbeeLampController),
  44. queue: util.NewQueue(1000),
  45. mapTopicHandle: make(map[string]func(m *mqtt.Message)),
  46. }
  47. //改成只有一个
  48. hlc.mapTopicHandle[GetHLTopicUp()] = hlc.handleTopicUp
  49. return &hlc
  50. }
  51. func (o *HlZigbeeConcentrator) PutMessage(m *mqtt.Message) {
  52. o.queue.Put(m)
  53. }
  54. func (o *HlZigbeeConcentrator) Start() {
  55. go o.HandleQueue()
  56. }
  57. func (o *HlZigbeeConcentrator) Stop() {
  58. o.cancel()
  59. }
  60. // HandleQueue 启用队列循环,不同的主题调用不同的fn
  61. func (o *HlZigbeeConcentrator) HandleQueue() {
  62. defer func() {
  63. if err := recover(); err != nil {
  64. logrus.Error("HlZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack()))
  65. go o.HandleQueue()
  66. }
  67. }()
  68. o.UpdateDID() //启动时更新本地灯控数组一次
  69. o.GetOnlineMsg()
  70. var obj protocol.HLWLZB_Frequency_Down
  71. if str, err := obj.EnCode(o.did, 10); err == nil { //启动时下发到设备的"上发频率"
  72. topic := GetHLTopicDown(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME)
  73. err = GetHlMqttMgr().Publish(topic, str, mqtt.AtLeastOnce)
  74. if err != nil {
  75. logrus.Error("HlZigbeeConcentrator.HandleQueue:主题发送失败:", topic, str)
  76. }
  77. }
  78. var exit = false
  79. timer := time.NewTicker(30 * time.Minute)
  80. timer5 := time.NewTicker(5 * time.Minute)
  81. //每天15点半同步日出日落时间
  82. var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute)
  83. for {
  84. select { //if
  85. case <-o.ctx.Done():
  86. logrus.Error("HlZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err())
  87. exit = true
  88. case <-timer.C: //每隔30分钟执行一次
  89. o.UpdateState() //更新集控器状态,防止无数据状态不更新
  90. o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新
  91. case <-timer5.C: //每隔5分钟执行一次
  92. o.UpdateDID() //更新灯控设备编码 更新本地灯控数组
  93. //同步日出日落时间
  94. if util.MlNow().After(SyncSunset) {
  95. if err := o.SyncSunset(); err == nil {
  96. SyncSunset = SyncSunset.AddDate(0, 0, 1)
  97. }
  98. }
  99. default:
  100. //从队列中获取指令执行fn
  101. if m, ok, quantity := o.queue.Get(); ok {
  102. if mm, ok := m.(*mqtt.Message); ok {
  103. if f, ok := o.mapTopicHandle[mm.Topic()]; ok {
  104. f(mm)
  105. } else {
  106. logrus.Error("HlZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic())
  107. }
  108. }
  109. } else if quantity == 0 {
  110. if exit {
  111. return
  112. }
  113. time.Sleep(100 * time.Millisecond)
  114. }
  115. }
  116. }
  117. }
  118. // SyncSunset 日出日落 平台->终端
  119. // TODO
  120. func (o *HlZigbeeConcentrator) SyncSunset() error {
  121. arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did)
  122. if err != nil {
  123. logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error())
  124. return err
  125. }
  126. if len(arr) == 0 {
  127. return nil
  128. }
  129. //Strategy相同的一起发送,不同的分批发送
  130. mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
  131. mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码
  132. for _, v := range arr {
  133. mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number))
  134. if _, ok := mapTime[v.Strategy]; ok {
  135. //已计算日出日落时间的,不再重复计算
  136. continue
  137. }
  138. var oot protocol.CHZB_OnOffTime
  139. var ls []models.LampStrategy
  140. if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
  141. oot.Brightness = uint8(ls[0].Brightness)
  142. }
  143. //计算时间
  144. if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
  145. onhour, _ := strconv.Atoi(strings.Split(set, ":")[0])
  146. onminite, _ := strconv.Atoi(strings.Split(set, ":")[1])
  147. offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
  148. offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1])
  149. oot.OnHour = uint8(onhour)
  150. oot.OnMinite = uint8(onminite)
  151. oot.OffHour = uint8(offhour)
  152. oot.OffMinite = uint8(offminite)
  153. }
  154. mapTime[v.Strategy] = &oot
  155. }
  156. for k, v := range mapNumber {
  157. if oot, ok := mapTime[k]; ok {
  158. var obj protocol.Pack_SetOnOffTime
  159. seq := GetNextSeq()
  160. if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil {
  161. topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME) //设置开关灯时间段,平台->终端
  162. err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
  163. if err != nil {
  164. logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error())
  165. }
  166. var msg string
  167. if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
  168. msg = string(msg0)
  169. } else {
  170. msg = str
  171. }
  172. odb := models.DeviceCmdRecord{
  173. ID: seq,
  174. GID: o.gid,
  175. DID: o.did,
  176. Topic: topic,
  177. Message: msg,
  178. State: 0,
  179. }
  180. if err := models.G_db.Create(&odb).Error; err != nil {
  181. logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error())
  182. } else {
  183. logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v)
  184. }
  185. }
  186. }
  187. }
  188. return nil
  189. }
  190. // UpdateDID 更新灯控的集控,更新集控的网关id
  191. func (o *HlZigbeeConcentrator) UpdateDID() {
  192. if arr, err := models.GetLampHlControllerByConcentrator(o.did); err == nil { //对集控did下所有灯控
  193. for _, v := range arr { //更新灯控设备编码
  194. if ld, ok := o.mapLamp[v.ID]; ok {
  195. ld.SetDID(v.Concentrator, v.ID) //集id,灯id
  196. if o.state == protocol.FAILED { //离线后下属灯都设置离线
  197. ld.SetOnOffLine(o.state)
  198. }
  199. } else {
  200. var ld HlZigbeeLampController //第一次则加入
  201. ld.SetDID(v.Concentrator, v.ID)
  202. o.mapLamp[v.ID] = &ld //更新本地单灯数组,需要用到
  203. }
  204. // 边缘(设备) // 设备后台 (网关后台、与java对接)
  205. o.gid = v.GID //设置集控器网关id
  206. }
  207. }
  208. }
  209. // 事件--改变状态
  210. func (o *HlZigbeeConcentrator) handleStateChange(t time.Time) {
  211. //最新状态
  212. state := uint8(0)
  213. if o.errCnt >= 10 {
  214. state = 1
  215. } else if o.errCnt == 0 {
  216. state = 0
  217. } else {
  218. return
  219. }
  220. //状态处理
  221. if o.lastStateTime.IsZero() || o.state == 0xff {
  222. t0, s0, err := getState(o.did) //取redis
  223. if err != nil {
  224. o.state = state
  225. o.lastStateTime = t
  226. return
  227. }
  228. o.state = s0
  229. o.lastStateTime = t0
  230. }
  231. if o.state == 0 && state == 1 { //在线->离线
  232. o.GetWillMsg()
  233. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
  234. } else if o.state == 1 && state == 0 { //离线->在线
  235. o.GetOnlineMsg()
  236. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
  237. }
  238. o.state = state
  239. o.lastStateTime = t
  240. }
  241. // UpdateState 每隔一分钟,集控置为离线
  242. func (o *HlZigbeeConcentrator) UpdateState() {
  243. if o.lastStateTime.IsZero() {
  244. t0, s0, err := getState(o.did)
  245. if err == nil {
  246. o.state = s0
  247. o.lastStateTime = t0
  248. }
  249. }
  250. if o.state == protocol.FAILED || //若之前是离线状态的,则不修改状态
  251. (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) {
  252. return
  253. }
  254. //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
  255. if o.state == protocol.SUCCESS {
  256. o.state = protocol.FAILED
  257. o.lastStateTime = util.MlNow()
  258. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime})
  259. cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state) //改变redis
  260. o.GetWillMsg()
  261. }
  262. }
  263. // UpdateLampControllerState 每隔一分钟,灯控置为离线
  264. func (o *HlZigbeeConcentrator) UpdateLampControllerState() {
  265. for _, v := range o.mapLamp {
  266. v.UpdateState()
  267. }
  268. }
  269. // {"masterSn":"301056","msgType":"offline","snGroup":[{"serialNumber":"301056"}],
  270. // "ts":"1653445142","uuid":"40a47d85-2ebb-41ab-a6d9-6aad4b934d02"} 只有集控离线事件
  271. func (o *HlZigbeeConcentrator) onofflineProcess(s string) {
  272. var obj protocol.HLWLZB_OnOfflineStatus_Up
  273. if err := obj.DeCode(s); err != nil {
  274. logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s解析错误:%s", s, err.Error())
  275. return
  276. }
  277. i64, err1 := strconv.ParseInt(obj.Ts, 10, 62)
  278. t, err2 := util.MlParseTime(util.Unix2Time(i64))
  279. if err1 != nil || err2 != nil {
  280. logrus.Errorf("HLWLZB_OnOfflineStatus_Up====%s时间解析错误:%v %v", obj.Ts, err1, err2)
  281. return
  282. }
  283. if obj.MasterSn == o.did {
  284. if obj.MsgType == protocol.MT_ONLINE { //上线
  285. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
  286. o.state = 0
  287. o.lastStateTime = t
  288. o.errCnt = 0
  289. o.GetOnlineMsg()
  290. } else if obj.MsgType == protocol.MT_OFFLINE { //离线
  291. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
  292. o.state = 1
  293. o.lastStateTime = t
  294. o.errCnt = 10
  295. o.GetWillMsg()
  296. }
  297. }
  298. }
  299. // 存灯数据
  300. // {"electricity":"0.0","energy":"0.64","lampStatus":"off","lux":"0.0","power":"0.0",
  301. // "powerFactor":"0.0","serialNumber":"301057","voltage":"226.0"}
  302. func (o *HlZigbeeConcentrator) reportProcess(s string) {
  303. var obj protocol.HLWLZB_QueryLampStatus_Back
  304. if err := obj.DeCode(s); err != nil {
  305. logrus.Errorf("HLWLZB_QueryLampStatus_Back====%s解析错误:%s", s, err.Error())
  306. return
  307. }
  308. i64, err := strconv.ParseInt(obj.Ts, 10, 62)
  309. t, err := util.MlParseTime(util.Unix2Time(i64))
  310. if err != nil {
  311. logrus.Errorf("时间[%s]解析错误:%s", obj.Ts, err.Error())
  312. return
  313. }
  314. //o.tid = obj.Data.TID //更新物模型TID
  315. //o.gid = obj.Gid 要通过数据库查询
  316. //处理集控器定时上报的单灯数据
  317. var errCnt_ uint = 0
  318. for _, v := range obj.SnGroup { //dev_data_XXX 更新数据 9个参数入库
  319. //未找到的等待下次更新灯控信息
  320. if ld, ok := o.mapLamp[v.SerialNumber]; ok { //对应单灯
  321. var cl protocol.CHZB_LampData
  322. var err2, err3, err4, err5, err6, err7 error
  323. cl.Data = make(map[uint16]float64)
  324. if v.LampStatus == "on" {
  325. cl.Data[1] = float64(1)
  326. } else {
  327. cl.Data[1] = float64(0)
  328. }
  329. cl.Data[0] = float64(0)
  330. cl.Data[3] = float64(0)
  331. cl.Data[2], err2 = strconv.ParseFloat(v.Electricity, 64)
  332. cl.Data[4], err4 = strconv.ParseFloat(v.Lux, 64)
  333. cl.Data[5], err5 = strconv.ParseFloat(v.Power, 64)
  334. cl.Data[6], err6 = strconv.ParseFloat(v.PowerFactor, 64)
  335. cl.Data[7], err7 = strconv.ParseFloat(v.Voltage, 64)
  336. cl.Data[8], err3 = strconv.ParseFloat(v.Energy, 64)
  337. if err2 != nil || err3 != nil || err5 != nil || err6 != nil || err7 != nil || err4 != nil {
  338. str, err := json.MarshalToString(v)
  339. logrus.Errorf("report_process数据[%s]解析错误:%v", str, err)
  340. continue
  341. }
  342. ld.HandleData(o.tenant, o.gid, v.SerialNumber /*灯*/, o.tid, t, &cl)
  343. }
  344. if v.State == protocol.FAILED {
  345. errCnt_++
  346. }
  347. }
  348. if errCnt_ == uint(len(obj.SnGroup)) {
  349. o.errCnt++
  350. } else {
  351. o.errCnt = 0
  352. }
  353. //先处理状态变化,再存入最新状态
  354. o.handleStateChange(t)
  355. cacheState(o.did, util.Unix2Time(i64), o.state) //更新dev_stat_XXX
  356. o.lastDataTime = t
  357. }
  358. func (o *HlZigbeeConcentrator) turnOnOffDimmerWholeProcess(m *mqtt.Message) {
  359. var obj protocol.HLWLZB_Switch_Whole_Ack
  360. var str string
  361. var state uint
  362. if err := obj.DeCode(m.PayloadString()); err != nil {
  363. return
  364. }
  365. ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
  366. if obj.ErrorCode == 0 {
  367. str = "Success"
  368. state = 1
  369. } else {
  370. str = ""
  371. state = 0
  372. }
  373. oo := models.DeviceCmdRecord{ //前插后更新
  374. ID: ts,
  375. State: state,
  376. Resp: str,
  377. }
  378. if err := oo.Update(); err != nil {
  379. logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
  380. obj.MasterSn, obj.Ts, m.Topic(), err.Error())
  381. }
  382. }
  383. func (o *HlZigbeeConcentrator) turnOnOffDimmerMultiProcess(m *mqtt.Message) {
  384. var obj protocol.HLWLZB_Switch_Multi_Ack
  385. var str string
  386. var state uint
  387. if err := obj.DeCode(m.PayloadString()); err != nil {
  388. return
  389. }
  390. ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
  391. if obj.ErrorCode == 0 {
  392. str = "Success"
  393. state = 1
  394. } else {
  395. str = ""
  396. state = 0
  397. }
  398. oo := models.DeviceCmdRecord{ //前插后更新
  399. ID: ts,
  400. State: state,
  401. Resp: str,
  402. }
  403. if err := oo.Update(); err != nil {
  404. logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
  405. obj.MasterSn, obj.Ts, m.Topic(), err.Error())
  406. }
  407. }
  408. func (o *HlZigbeeConcentrator) strategyProcess(m *mqtt.Message) {
  409. var obj protocol.Pack_HLSetOnOffTime_Ack
  410. var str string
  411. var state uint
  412. if err := obj.DeCode(m.PayloadString()); err != nil {
  413. return
  414. }
  415. ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
  416. if obj.ErrorCode == 0 {
  417. str = "Success"
  418. state = 1
  419. } else {
  420. str = ""
  421. state = 0
  422. }
  423. oo := models.DeviceCmdRecord{ //前插后更新
  424. ID: ts,
  425. State: state,
  426. Resp: str,
  427. }
  428. if err := oo.Update(); err != nil {
  429. logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
  430. obj.MasterSn, obj.Ts, m.Topic(), err.Error())
  431. }
  432. }
  433. func (o *HlZigbeeConcentrator) clearStrategyProcess(m *mqtt.Message) {
  434. var obj protocol.Pack_HLClearStrategy_Ack
  435. var str string
  436. var state uint
  437. if err := obj.DeCode(m.PayloadString()); err != nil {
  438. return
  439. }
  440. ts, _ := strconv.ParseUint(obj.Ts, 10, 64)
  441. if obj.ErrorCode == 0 {
  442. str = "Success"
  443. state = 1
  444. } else {
  445. str = ""
  446. state = 0
  447. }
  448. oo := models.DeviceCmdRecord{ //前插后更新
  449. ID: ts,
  450. State: state,
  451. Resp: str,
  452. }
  453. if err := oo.Update(); err != nil {
  454. logrus.Errorf("收到网关[%s]的响应[seq:%s],主题:%s,但更新数据库失败[%s]",
  455. obj.MasterSn, obj.Ts, m.Topic(), err.Error())
  456. }
  457. }
  458. // fn topic处理入口
  459. func (o *HlZigbeeConcentrator) handleTopicUp(m *mqtt.Message) {
  460. var hp protocol.HLWLZB_Pack
  461. if err := hp.DeCode(m.PayloadString()); err != nil {
  462. logrus.Errorf("HlZigbeeConcentrator.handle_Topic_Up解析错误:%s,%s", m.PayloadString(), err.Error())
  463. }
  464. switch hp.MsgType {
  465. case protocol.MT_REPORT: //上报数据
  466. o.reportProcess(m.PayloadString())
  467. case protocol.MT_OFFLINE:
  468. fallthrough
  469. case protocol.MT_ONLINE: //上线,下线
  470. o.onofflineProcess(m.PayloadString())
  471. case protocol.MT_TURNON:
  472. fallthrough
  473. case protocol.MT_TURNOFF:
  474. fallthrough
  475. case protocol.MT_DIMMER: //开灯,关灯 dimmer ack
  476. if hp.ActionType == protocol.AT_WHOLE {
  477. o.turnOnOffDimmerWholeProcess(m)
  478. } else if hp.ActionType == protocol.AT_MULTI {
  479. o.turnOnOffDimmerMultiProcess(m)
  480. }
  481. case protocol.MT_STRATEGY: //策略 ack
  482. o.strategyProcess(m)
  483. case protocol.MT_CLEARSTRATEGY: //清除策略 ack
  484. o.clearStrategyProcess(m)
  485. case protocol.MT_ALARM: //告警
  486. //TODO
  487. default:
  488. }
  489. }
  490. func (o *HlZigbeeConcentrator) GetOnlineMsg() {
  491. //发布上线消息
  492. var obj protocol.Pack_IDObject
  493. str, err := obj.EnCode(o.gid, GetNextSeq(), 0)
  494. if err != nil {
  495. logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误1:%s", err.Error())
  496. }
  497. topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_ONLINE)
  498. if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil {
  499. logrus.Errorf("HlZigbeeConcentrator.GetOnlineMsg:发布消息错误:%s", err.Error())
  500. }
  501. }
  502. func (o *HlZigbeeConcentrator) GetWillMsg() {
  503. payload, _ := (&protocol.Pack_IDObject{}).EnCode(o.gid, GetNextSeq(), 0) //遗嘱消息
  504. topic := GetTopic(o.tenant, protocol.DT_GATEWAY, o.gid, protocol.TP_GW_WILL)
  505. if err := GetMQTTMgr().Publish(topic, payload, mqtt.AtMostOnce); err != nil {
  506. logrus.Errorf("HlZigbeeConcentrator.GetWillMsg:发布消息错误:%s", err.Error())
  507. }
  508. }