chzbconcentrator.go 16 KB


  1. package main
  2. import (
  3. "context"
  4. "runtime/debug"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/sirupsen/logrus"
  10. "lc/common/models"
  11. "lc/common/mqtt"
  12. "lc/common/protocol"
  13. "lc/common/util"
  14. )
  15. // 长河Zigbee 集控器
  16. type ChZigbeeConcentrator struct {
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. lock sync.Mutex
  20. tenant string //租户
  21. gid string //基础数据,网关ID
  22. did string //基础数据,集控器设备编码
  23. tid uint16 //基础数据,集控器物模型ID
  24. mapLamp map[string]*ChZigbeeLampController //单灯数据,编号唯一
  25. state uint8 //实时数据,0在线,1离线
  26. lastStateTime time.Time //实时数据,最新状态时间
  27. lastDataTime time.Time //最新数据时间
  28. nextHourTime time.Time //实时数据
  29. queue *util.MlQueue //数据队列
  30. errCnt uint //错误计数
  31. mapTopicHandle map[string]func(m *mqtt.Message)
  32. }
  33. func NewChZigbeeConcentrator(tenant, did string) *ChZigbeeConcentrator {
  34. ctx, cancel := context.WithCancel(context.Background())
  35. chc := ChZigbeeConcentrator{
  36. ctx: ctx,
  37. cancel: cancel,
  38. tenant: tenant,
  39. did: did,
  40. mapLamp: make(map[string]*ChZigbeeLampController),
  41. queue: util.NewQueue(1000),
  42. mapTopicHandle: make(map[string]func(m *mqtt.Message)),
  43. }
  44. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_DATA)] = chc.handleTpChZigbeeData
  45. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_LAMP)] = chc.handleTpChZigbeeQLamp
  46. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_WAITTIME_ACK)] = chc.handleTpChZigbeeSetWaittimeAck
  47. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_SWITCH_ACK)] = chc.handleTpChZigbeeSetSwitchAck
  48. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK)] = chc.handleTpChZigbeeSetRecoveryAutoAck
  49. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_ONOFFTIME_ACK)] = chc.handleTpChZigbeeSetOnofftimeAck
  50. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK)] = chc.handleTpChZigbeeQueryOnofftimeAck
  51. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK)] = chc.handleTpChZigbeeSetUpdateLampAck
  52. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_QUERY_TIME_ACK)] = chc.handleTpChZigbeeQueryTimeAck
  53. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_SET_BROADCASTTIME_ACK)] = chc.handleTpChZigbeeSetBroadcastTimeAck
  54. chc.mapTopicHandle[GetTopic(tenant, protocol.DT_CONCENTRATOR, did, protocol.TP_CHZB_ALARM)] = chc.handleTpChZigbeeAlarm
  55. return &chc
  56. }
  57. func (o *ChZigbeeConcentrator) PutMessage(m *mqtt.Message) {
  58. o.queue.Put(m)
  59. }
  60. func (o *ChZigbeeConcentrator) Start() {
  61. go o.HandleQueue()
  62. }
  63. func (o *ChZigbeeConcentrator) Stop() {
  64. o.cancel()
  65. }
  66. func (o *ChZigbeeConcentrator) HandleQueue() {
  67. defer func() {
  68. if err := recover(); err != nil {
  69. logrus.Error("ChZigbeeConcentrator.HandleQueue发生异常:", string(debug.Stack()))
  70. go o.HandleQueue()
  71. }
  72. }()
  73. o.UpdateDID()
  74. var exit = false
  75. timer := time.NewTicker(1 * time.Minute)
  76. timer5 := time.NewTicker(5 * time.Minute)
  77. //每天15点半同步日出日落时间
  78. var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(14*time.Hour + 30*time.Minute)
  79. for {
  80. select {
  81. case <-o.ctx.Done():
  82. logrus.Error("ChZigbeeConcentrator.HandleQueue即将退出,原因:", o.ctx.Err())
  83. exit = true
  84. case <-timer.C: //每隔1分钟执行一次
  85. o.UpdateState() //更新集控器状态,防止无数据状态不更新
  86. o.UpdateLampControllerState() //更新灯控状态,防止无数据状态不更新
  87. case <-timer5.C: //每隔5分钟执行一次
  88. o.UpdateDID() //更新灯控设备编码
  89. //同步日出日落时间
  90. if util.MlNow().After(SyncSunset) {
  91. if err := o.SyncSunset(); err == nil {
  92. SyncSunset = SyncSunset.AddDate(0, 0, 1)
  93. }
  94. }
  95. default:
  96. //从队列钟获取指令执行
  97. if m, ok, quantity := o.queue.Get(); ok {
  98. if mm, ok := m.(*mqtt.Message); ok {
  99. if f, ok := o.mapTopicHandle[mm.Topic()]; ok {
  100. f(mm)
  101. } else {
  102. logrus.Error("ChZigbeeConcentrator.HandleQueue:不支持的主题:", mm.Topic())
  103. }
  104. }
  105. } else if quantity == 0 {
  106. if exit {
  107. return
  108. }
  109. time.Sleep(100 * time.Millisecond)
  110. }
  111. }
  112. }
  113. }
  114. func (o *ChZigbeeConcentrator) SyncSunset() error {
  115. arr, err := models.GetZigbeeLampstrategyByconcentrator(o.did)
  116. if err != nil {
  117. logrus.Errorf("集控器[%s]从数据库读取日出日落时间错误:%s", o.did, err.Error())
  118. return err
  119. }
  120. if len(arr) == 0 {
  121. return nil
  122. }
  123. //Strategy相同的一起发送,不同的分批发送
  124. mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
  125. mapNumber := make(map[string][]uint32) //策略灯控编号,注意非编码
  126. for _, v := range arr {
  127. mapNumber[v.ID] = append(mapNumber[v.ID], uint32(v.Number))
  128. if _, ok := mapTime[v.Strategy]; ok {
  129. //已计算日出日落时间的,不再重复计算
  130. continue
  131. }
  132. var oot protocol.CHZB_OnOffTime
  133. var ls []models.LampStrategy
  134. if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
  135. oot.Brightness = uint8(ls[0].Brightness)
  136. }
  137. //计算时间
  138. if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
  139. onhour, _ := strconv.Atoi(strings.Split(set, ":")[0])
  140. onminite, _ := strconv.Atoi(strings.Split(set, ":")[1])
  141. offhour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
  142. offminite, _ := strconv.Atoi(strings.Split(rise, ":")[1])
  143. oot.OnHour = uint8(onhour)
  144. oot.OnMinite = uint8(onminite)
  145. oot.OffHour = uint8(offhour)
  146. oot.OffMinite = uint8(offminite)
  147. }
  148. mapTime[v.Strategy] = &oot
  149. }
  150. for k, v := range mapNumber {
  151. if oot, ok := mapTime[k]; ok {
  152. var obj protocol.Pack_SetOnOffTime
  153. seq := GetNextSeq()
  154. if str, err := obj.EnCode(o.did, o.gid, seq, v, []protocol.CHZB_OnOffTime{*oot}); err == nil {
  155. topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_ONOFFTIME)
  156. err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
  157. if err != nil {
  158. logrus.Errorf("SyncSunset:集控器[%s]对灯控[%v]发布日出日落消息错误:%s", o.did, v, err.Error())
  159. }
  160. var msg string
  161. if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
  162. msg = string(msg0)
  163. } else {
  164. msg = str
  165. }
  166. odb := models.DeviceCmdRecord{
  167. ID: seq,
  168. GID: o.gid,
  169. DID: o.did,
  170. Topic: topic,
  171. Message: msg,
  172. State: 0,
  173. }
  174. if err := models.G_db.Create(&odb).Error; err != nil {
  175. logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库错误:%s", o.did, v, err.Error())
  176. } else {
  177. logrus.Errorf("集控器[%s]对灯控[%v]发布日出日落时间时指令入库成功", o.did, v)
  178. }
  179. }
  180. }
  181. }
  182. return nil
  183. }
  184. func (o *ChZigbeeConcentrator) UpdateDID() {
  185. if arr, err := models.GetLampControllerByConcentrator(o.did); err == nil {
  186. maplamps := make(map[uint32]string, len(arr))
  187. for _, v := range arr {
  188. if ld, ok := o.mapLamp[v.ID]; ok {
  189. ld.SetDID(v.ID, uint32(v.Number))
  190. } else {
  191. var ld ChZigbeeLampController
  192. ld.SetDID(v.ID, uint32(v.Number))
  193. o.mapLamp[v.ID] = &ld
  194. }
  195. if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用
  196. maplamps[uint32(v.Number)] = v.ID
  197. }
  198. o.gid = v.GID
  199. }
  200. if len(maplamps) > 0 {
  201. var ret protocol.Pack_CHZB_LampIDs
  202. if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), maplamps); err == nil {
  203. topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP)
  204. if err := GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce); err != nil {
  205. logrus.Errorf("UpdateDID:发布消息错误:%s", err.Error())
  206. }
  207. }
  208. }
  209. }
  210. //更新长和zigbee灯控物模型
  211. if o.tid > 0 {
  212. if err := models.UpdateTID(o.did, int(o.tid)); err != nil {
  213. logrus.Errorf("集控器[%s]更新灯控物模型失败:%s", o.did, err.Error())
  214. }
  215. }
  216. }
  217. func (o *ChZigbeeConcentrator) handleStateChange(t time.Time) {
  218. //最新状态
  219. state := uint8(0)
  220. if o.errCnt >= 10 {
  221. state = 1
  222. } else if o.errCnt == 0 {
  223. state = 0
  224. } else {
  225. return
  226. }
  227. //状态处理
  228. if o.lastStateTime.IsZero() || o.state == 0xff {
  229. t0, s0, err := getState(o.did)
  230. if err != nil {
  231. o.state = state
  232. o.lastStateTime = t
  233. return
  234. }
  235. o.state = s0
  236. o.lastStateTime = t0
  237. }
  238. if o.state == 0 && state == 1 { //在线->离线
  239. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: t})
  240. } else if o.state == 1 && state == 0 { //离线->在线
  241. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_ONLINE, Time: t})
  242. }
  243. o.state = state
  244. o.lastStateTime = t
  245. }
  246. func (o *ChZigbeeConcentrator) UpdateState() {
  247. if o.lastStateTime.IsZero() {
  248. t0, s0, err := getState(o.did)
  249. if err == nil {
  250. o.state = s0
  251. o.lastStateTime = t0
  252. }
  253. }
  254. if o.state == protocol.FAILED ||
  255. (!o.lastDataTime.IsZero() && util.MlNow().Sub(o.lastDataTime).Minutes() < OfflineInterval) {
  256. return
  257. }
  258. //如果之前一直是在线状态的,则置为离线;若之前是离线状态的,则不修改状态
  259. if o.state == protocol.SUCCESS {
  260. o.state = protocol.FAILED
  261. o.lastStateTime = util.MlNow()
  262. GetEventMgr().PushEvent(&EventObject{ID: o.did, EventType: models.ET_OFFLINE, Time: o.lastStateTime})
  263. cacheState(o.did, o.lastStateTime.Format("2006-01-02 15:04:05"), o.state)
  264. }
  265. }
  266. func (o *ChZigbeeConcentrator) UpdateLampControllerState() {
  267. for _, v := range o.mapLamp {
  268. v.UpdateState()
  269. }
  270. }
  271. func (o *ChZigbeeConcentrator) handleTpChZigbeeData(m *mqtt.Message) {
  272. var obj protocol.Pack_CHZB_UploadData
  273. if err := obj.DeCode(m.PayloadString()); err != nil {
  274. return
  275. }
  276. t, err := util.MlParseTime(obj.Time)
  277. if err != nil {
  278. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  279. return
  280. }
  281. o.tid = obj.Data.TID //更新物模型TID
  282. o.gid = obj.Gid
  283. //处理集控器定时上报的单灯数据
  284. var errCnt_ uint = 0
  285. for k, v := range obj.Data.Data {
  286. //未找到的等待下次更新灯控信息
  287. if ld, ok := o.mapLamp[k]; ok {
  288. ld.HandleData(o.tenant, obj.Gid, o.did, o.tid, t, v)
  289. }
  290. if v.State == protocol.FAILED {
  291. errCnt_++
  292. }
  293. }
  294. if errCnt_ == uint(len(obj.Data.Data)) {
  295. o.errCnt++
  296. } else {
  297. o.errCnt = 0
  298. }
  299. //先处理状态变化,再存入最新状态
  300. o.handleStateChange(t)
  301. cacheState(o.did, obj.Time, o.state)
  302. o.lastDataTime = t
  303. }
  304. func (o *ChZigbeeConcentrator) handleTpChZigbeeAlarm(m *mqtt.Message) {
  305. var obj protocol.Pack_CHZB_LampAlarm
  306. if err := obj.DeCode(m.PayloadString()); err != nil {
  307. return
  308. }
  309. if ld, ok := o.mapLamp[obj.Data.DID]; ok {
  310. ld.HandleAlarm(obj.Data)
  311. } else {
  312. logrus.Errorf("未找到灯控:%s", obj.Data.DID)
  313. }
  314. }
  315. func (o *ChZigbeeConcentrator) handleTpChZigbeeQLamp(m *mqtt.Message) {
  316. var obj protocol.Pack_CHZB_EmptyObject
  317. if err := obj.DeCode(m.PayloadString()); err != nil {
  318. return
  319. }
  320. arr, err := models.GetLampControllerByConcentrator(obj.Id)
  321. if err != nil || len(arr) == 0 {
  322. return
  323. }
  324. lamps := make(map[uint32]string, len(arr))
  325. for _, v := range arr {
  326. if v.Number != 0xFEFE { //0xFEFE为广播地址,禁止使用
  327. lamps[uint32(v.Number)] = v.ID
  328. }
  329. //顺带更新
  330. if ld, ok := o.mapLamp[v.ID]; ok {
  331. ld.SetDID(v.ID, uint32(v.Number))
  332. } else {
  333. var ld ChZigbeeLampController
  334. ld.SetDID(v.ID, uint32(v.Number))
  335. o.mapLamp[v.ID] = &ld
  336. }
  337. }
  338. var ret protocol.Pack_CHZB_LampIDs
  339. if str, err := ret.EnCode(o.did, o.gid, GetNextSeq(), lamps); err == nil {
  340. topic := GetTopic(o.tenant, protocol.DT_CONCENTRATOR, o.did, protocol.TP_CHZB_SET_UPDATE_LAMP)
  341. if err := GetMQTTMgr().Publish(topic, str, mqtt.AtMostOnce); err != nil {
  342. logrus.Errorf("handle_TP_CHZB_Q_LAMP:发布消息错误:%s", err.Error())
  343. }
  344. }
  345. }
  346. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetWaittimeAck(m *mqtt.Message) {
  347. var obj protocol.Pack_Ack
  348. if err := obj.DeCode(m.PayloadString()); err != nil {
  349. return
  350. }
  351. oo := models.DeviceCmdRecord{
  352. ID: obj.Seq,
  353. State: uint(obj.Data.State),
  354. Resp: obj.Data.Error,
  355. }
  356. if err := oo.Update(); err != nil {
  357. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  358. obj.Id, obj.Seq, m.Topic(), err.Error())
  359. }
  360. }
  361. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetSwitchAck(m *mqtt.Message) {
  362. var obj protocol.Pack_CHZB_SeqLampAck
  363. if err := obj.DeCode(m.PayloadString()); err != nil {
  364. return
  365. }
  366. resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
  367. oo := models.DeviceCmdRecord{
  368. ID: obj.Seq,
  369. State: 1,
  370. Resp: string(resq),
  371. }
  372. if err := oo.Update(); err != nil {
  373. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  374. obj.Id, obj.Seq, m.Topic(), err.Error())
  375. }
  376. }
  377. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetRecoveryAutoAck(m *mqtt.Message) {
  378. var obj protocol.Pack_CHZB_SeqLampAck
  379. if err := obj.DeCode(m.PayloadString()); err != nil {
  380. return
  381. }
  382. resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
  383. oo := models.DeviceCmdRecord{
  384. ID: obj.Seq,
  385. State: 1,
  386. Resp: string(resq),
  387. }
  388. if err := oo.Update(); err != nil {
  389. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  390. obj.Id, obj.Seq, m.Topic(), err.Error())
  391. }
  392. }
  393. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetOnofftimeAck(m *mqtt.Message) {
  394. var obj protocol.Pack_CHZB_SeqLampAck
  395. if err := obj.DeCode(m.PayloadString()); err != nil {
  396. return
  397. }
  398. resq, _ := json.MarshalIndent(obj.Data.MapLamp, "", " ")
  399. oo := models.DeviceCmdRecord{
  400. ID: obj.Seq,
  401. State: 1,
  402. Resp: string(resq),
  403. }
  404. if err := oo.Update(); err != nil {
  405. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  406. obj.Id, obj.Seq, m.Topic(), err.Error())
  407. }
  408. }
  409. func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryOnofftimeAck(m *mqtt.Message) {
  410. var obj protocol.Pack_CHZB_QueryOnOffTimeAck
  411. if err := obj.DeCode(m.PayloadString()); err != nil {
  412. return
  413. }
  414. oo := models.DeviceCmdRecord{
  415. ID: obj.Seq,
  416. State: 1,
  417. Resp: obj.Data.Error,
  418. }
  419. if err := oo.Update(); err != nil {
  420. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  421. obj.Id, obj.Seq, m.Topic(), err.Error())
  422. }
  423. }
  424. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetUpdateLampAck(m *mqtt.Message) {
  425. var obj protocol.Pack_Ack
  426. if err := obj.DeCode(m.PayloadString()); err != nil {
  427. return
  428. }
  429. oo := models.DeviceCmdRecord{
  430. ID: obj.Seq,
  431. State: 1,
  432. Resp: obj.Data.Error,
  433. }
  434. if err := oo.Update(); err != nil {
  435. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  436. obj.Id, obj.Seq, m.Topic(), err.Error())
  437. }
  438. }
  439. func (o *ChZigbeeConcentrator) handleTpChZigbeeQueryTimeAck(m *mqtt.Message) {
  440. var obj protocol.Pack_CHZB_QueryTimeAck
  441. if err := obj.DeCode(m.PayloadString()); err != nil {
  442. return
  443. }
  444. oo := models.DeviceCmdRecord{
  445. ID: obj.Seq,
  446. State: 1,
  447. Resp: obj.Data.Error,
  448. }
  449. if err := oo.Update(); err != nil {
  450. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  451. obj.Id, obj.Seq, m.Topic(), err.Error())
  452. }
  453. if obj.Data.LampTime != nil {
  454. logrus.Errorf("集控器[编码为:%s]的灯控[编号为:%d]当前时间为:%02d:%02d:%02d",
  455. o.did, obj.Data.LampID, obj.Data.LampTime.Hour, obj.Data.LampTime.Minite, obj.Data.LampTime.Second)
  456. }
  457. }
  458. func (o *ChZigbeeConcentrator) handleTpChZigbeeSetBroadcastTimeAck(m *mqtt.Message) {
  459. var obj protocol.Pack_Ack
  460. if err := obj.DeCode(m.PayloadString()); err != nil {
  461. return
  462. }
  463. oo := models.DeviceCmdRecord{
  464. ID: obj.Seq,
  465. State: 1,
  466. Resp: obj.Data.Error,
  467. }
  468. if err := oo.Update(); err != nil {
  469. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  470. obj.Id, obj.Seq, m.Topic(), err.Error())
  471. }
  472. }