ymlampcontrollermgr.go 7.1 KB


  1. package main
  2. import (
  3. "context"
  4. "runtime"
  5. "runtime/debug"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/sirupsen/logrus"
  11. "lc/common/models"
  12. "lc/common/mqtt"
  13. "lc/common/protocol"
  14. "lc/common/util"
  15. )
  16. var _YMLampControllerMgrOnce sync.Once
  17. var _YMLampControllerMgrSingle *YMLampControllerMgr
  18. func GetYMLampControllerMgr() *YMLampControllerMgr {
  19. _YMLampControllerMgrOnce.Do(func() {
  20. ctx, cancel := context.WithCancel(context.Background())
  21. _YMLampControllerMgrSingle = &YMLampControllerMgr{
  22. queue: util.NewQueue(100),
  23. mapYMLampController: make(map[string]*YMLampController),
  24. ctx: ctx,
  25. cancel: cancel,
  26. }
  27. })
  28. return _YMLampControllerMgrSingle
  29. }
  30. type YMLampControllerMgr struct {
  31. queue *util.MlQueue
  32. mapYMLampController map[string]*YMLampController
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. }
  36. func (o *YMLampControllerMgr) SubscribeTopics() {
  37. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_DATA), mqtt.AtMostOnce, o.HandlerData)
  38. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_ALARM), mqtt.AtMostOnce, o.HandlerData)
  39. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_SWITCH_ACK), mqtt.AtMostOnce, o.HandlerData)
  40. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_LAMPCONTROLLER, protocol.TP_YM_SET_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  41. }
  42. func (o *YMLampControllerMgr) HandlerData(m mqtt.Message) {
  43. for {
  44. ok, cnt := o.queue.Put(&m)
  45. if ok {
  46. break
  47. } else {
  48. logrus.Errorf("YMLampControllerMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  49. runtime.Gosched()
  50. }
  51. }
  52. }
  53. func (o *YMLampControllerMgr) Stop() {
  54. o.cancel()
  55. }
  56. func (o *YMLampControllerMgr) Handler(args ...interface{}) interface{} {
  57. defer func() {
  58. if err := recover(); err != nil {
  59. gopool.Add(o.Handler, args)
  60. logrus.Errorf("YMLampControllerMgr.Handler发生异常:%v", err)
  61. logrus.Errorf("YMLampControllerMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
  62. }
  63. }()
  64. exit := false
  65. timer := time.NewTicker(1 * time.Minute)
  66. //每天15点半同步日出日落时间
  67. var SyncSunset = util.New(util.MlNow()).BeginningOfDay().Add(10*time.Hour + 30*time.Minute)
  68. for {
  69. select {
  70. case <-o.ctx.Done():
  71. logrus.Error("YMLampControllerMgr.HandleQueue即将退出,原因:", o.ctx.Err())
  72. exit = true
  73. case <-timer.C: //每隔1分钟执行一次
  74. //更新灯控状态,防止无数据状态不更新
  75. o.UpdateLampControllerState()
  76. //同步日出日落时间
  77. if util.MlNow().After(SyncSunset) {
  78. if err := o.SyncSunset(); err == nil {
  79. SyncSunset = SyncSunset.AddDate(0, 0, 1)
  80. }
  81. }
  82. default:
  83. if o.handleQueue() == 0 {
  84. if exit {
  85. return 0
  86. }
  87. time.Sleep(100 * time.Millisecond)
  88. }
  89. }
  90. }
  91. }
  92. func (o *YMLampControllerMgr) handleQueue() uint32 {
  93. msg, ok, quantity := o.queue.Get()
  94. if !ok {
  95. return quantity
  96. } else if quantity > 1000 {
  97. logrus.Warnf("YMLampControllerMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  98. }
  99. m, ok := msg.(*mqtt.Message)
  100. if !ok {
  101. return quantity
  102. }
  103. Tenant, _, DID, topic, err := ParseTopic(m.Topic())
  104. if err != nil {
  105. return quantity
  106. }
  107. pymlc, ok := o.mapYMLampController[DID]
  108. if !ok {
  109. pymlc = &YMLampController{}
  110. pymlc.Set(Tenant, DID)
  111. o.mapYMLampController[DID] = pymlc
  112. }
  113. switch topic {
  114. case protocol.TP_YM_DATA:
  115. o.handleDATA(pymlc, m)
  116. case protocol.TP_YM_ALARM:
  117. o.handleALARM(pymlc, m)
  118. case protocol.TP_YM_SET_SWITCH_ACK, protocol.TP_YM_SET_ONOFFTIME_ACK:
  119. o.handleACK(m)
  120. }
  121. return quantity
  122. }
  123. func (o *YMLampControllerMgr) handleDATA(lp *YMLampController, m *mqtt.Message) {
  124. var obj protocol.Pack_CHZB_UploadData
  125. if err := obj.DeCode(m.PayloadString()); err != nil {
  126. return
  127. }
  128. t, err := util.MlParseTime(obj.Time)
  129. if err != nil {
  130. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  131. return
  132. }
  133. for _, v := range obj.Data.Data {
  134. lp.HandleData(obj.Gid, obj.Data.TID, t, v)
  135. }
  136. }
  137. func (o *YMLampControllerMgr) handleALARM(lp *YMLampController, m *mqtt.Message) {
  138. var obj protocol.Pack_CHZB_LampAlarm
  139. if err := obj.DeCode(m.PayloadString()); err != nil {
  140. return
  141. }
  142. lp.HandleAlarm(obj.Data)
  143. }
  144. func (o *YMLampControllerMgr) handleACK(m *mqtt.Message) {
  145. var obj protocol.Pack_Ack
  146. if err := obj.DeCode(m.PayloadString()); err != nil {
  147. return
  148. }
  149. oo := models.DeviceCmdRecord{ID: obj.Seq, State: 1, Resp: obj.Data.Error}
  150. if err := oo.Update(); err != nil {
  151. logrus.Errorf("收到设备[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error())
  152. }
  153. }
  154. // SyncSunset 统一更新裕明485灯控的日出日落时间
  155. func (o *YMLampControllerMgr) SyncSunset() error {
  156. arr, err := models.GetYm485Lampstrategy(nil)
  157. if err != nil {
  158. logrus.Errorf("从数据库读取设置为日出日落时间的485灯控发生错误:%s", err.Error())
  159. return err
  160. }
  161. if len(arr) == 0 {
  162. return nil
  163. }
  164. //分别计算日出日落
  165. mapTime := make(map[string]*protocol.CHZB_OnOffTime) //策略时间
  166. for _, v := range arr {
  167. if _, ok := mapTime[v.Strategy]; ok {
  168. //已计算日出日落时间的,不再重复计算
  169. continue
  170. }
  171. var oot protocol.CHZB_OnOffTime
  172. var ls []models.LampStrategy
  173. if err := json.UnmarshalFromString(v.TimeInfo, &ls); err == nil && len(ls) > 0 {
  174. oot.Brightness = uint8(ls[0].Brightness)
  175. }
  176. //计算时间
  177. if rise, set, err := util.SunriseSunsetForChina(v.Latitude, v.Longitude); err == nil {
  178. onHour, _ := strconv.Atoi(strings.Split(set, ":")[0])
  179. onMinute, _ := strconv.Atoi(strings.Split(set, ":")[1])
  180. offHour, _ := strconv.Atoi(strings.Split(rise, ":")[0])
  181. offMinute, _ := strconv.Atoi(strings.Split(rise, ":")[1])
  182. oot.OnHour = uint8(onHour)
  183. oot.OnMinite = uint8(onMinute)
  184. oot.OffHour = uint8(offHour)
  185. oot.OffMinite = uint8(offMinute)
  186. }
  187. mapTime[v.Strategy] = &oot
  188. }
  189. //发布mqtt消息
  190. for _, v := range arr {
  191. if oot, ok := mapTime[v.Strategy]; ok {
  192. var obj protocol.Pack_SetOnOffTime
  193. seq := GetNextSeq()
  194. if str, err := obj.EnCode(v.ID, v.GID, seq, nil, []protocol.CHZB_OnOffTime{*oot}); err == nil {
  195. topic := GetTopic(v.Tenant, protocol.DT_LAMPCONTROLLER, v.ID, protocol.TP_YM_SET_ONOFFTIME)
  196. err = GetMQTTMgr().Publish(topic, str, mqtt.AtLeastOnce)
  197. if err != nil {
  198. logrus.Errorf("SyncSunset:对灯控[%s]发布日出日落消息错误:%s", v.ID, err.Error())
  199. }
  200. var msg string
  201. if msg0, errmsg := json.MarshalIndent(obj, "", " "); errmsg == nil {
  202. msg = string(msg0)
  203. } else {
  204. msg = str
  205. }
  206. odb := models.DeviceCmdRecord{
  207. ID: seq,
  208. GID: v.GID,
  209. DID: v.ID,
  210. Topic: topic,
  211. Message: msg,
  212. State: 0,
  213. }
  214. if err := models.G_db.Create(&odb).Error; err != nil {
  215. logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库错误:%s", v.ID, err.Error())
  216. } else {
  217. logrus.Errorf("对灯控[%s]发布日出日落时间时指令入库成功", v.ID)
  218. }
  219. }
  220. }
  221. }
  222. return nil
  223. }
  224. func (o *YMLampControllerMgr) UpdateLampControllerState() {
  225. for _, v := range o.mapYMLampController {
  226. v.UpdateState()
  227. }
  228. }