mqtthandle.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package main
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "lc/common/mqtt"
  7. "lc/common/protocol"
  8. "lc/common/util"
  9. )
  10. func HandleTpQApp(m mqtt.Message) {
  11. var obj protocol.Pack_IDObject
  12. var ret protocol.Pack_MutilFileObject
  13. if err := obj.DeCode(m.PayloadString()); err == nil {
  14. //读文件内容
  15. ReadMutilFileContent(protocol.TP_GW_APP, obj.Data.Id, &ret)
  16. if str, err := ret.EnCode(appConfig.GID, obj.Seq); err == nil {
  17. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_APP_ACK), str, 0, ToCloud)
  18. }
  19. }
  20. }
  21. func HandleTpWApp(m mqtt.Message) {
  22. var obj protocol.Pack_SeqFileObject
  23. var ret protocol.Pack_Ack
  24. err := obj.DeCode(m.PayloadString())
  25. if err == nil {
  26. err = HandleFile(protocol.TP_GW_SET_APP, &obj)
  27. }
  28. if str, err := ret.EnCode(appConfig.GID, appConfig.GID, obj.Seq, err); err == nil {
  29. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_APP_ACK), str, 0, ToCloud)
  30. }
  31. }
  32. func HandleTpQSerial(m mqtt.Message) {
  33. var obj protocol.Pack_IDObject
  34. var ret protocol.Pack_MutilFileObject
  35. if err := obj.DeCode(m.PayloadString()); err == nil {
  36. //读文件内容
  37. ReadMutilFileContent(protocol.TP_GW_SERIAL, obj.Data.Id, &ret)
  38. if str, err := ret.EnCode(appConfig.GID, obj.Seq); err == nil {
  39. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SERIAL_ACK), str, 0, ToCloud)
  40. }
  41. }
  42. }
  43. func HandleTpWSerial(m mqtt.Message) {
  44. var obj protocol.Pack_SeqFileObject
  45. var ret protocol.Pack_Ack
  46. err := obj.DeCode(m.PayloadString())
  47. if err == nil {
  48. err = HandleFile(protocol.TP_GW_SET_SERIAL, &obj)
  49. }
  50. if str, err := ret.EnCode(appConfig.GID, appConfig.GID, obj.Seq, err); err == nil {
  51. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_SERIAL_ACK), str, 0, ToCloud)
  52. }
  53. }
  54. func HandleTpQRtu(m mqtt.Message) {
  55. var obj protocol.Pack_IDObject
  56. var ret protocol.Pack_MutilFileObject
  57. if err := obj.DeCode(m.PayloadString()); err == nil {
  58. //读文件内容
  59. ReadMutilFileContent(protocol.TP_GW_RTU, obj.Data.Id, &ret)
  60. if str, err := ret.EnCode(appConfig.GID, obj.Seq); err == nil {
  61. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_RTU_ACK), str, 0, ToCloud)
  62. }
  63. }
  64. }
  65. func HandleTpWRtu(m mqtt.Message) {
  66. var obj protocol.Pack_SeqFileObject
  67. var ret protocol.Pack_Ack
  68. err := obj.DeCode(m.PayloadString())
  69. if err == nil {
  70. err = HandleFile(protocol.TP_GW_SET_RTU, &obj)
  71. }
  72. if str, err := ret.EnCode(appConfig.GID, appConfig.GID, obj.Seq, err); err == nil {
  73. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_RTU_ACK), str, 0, ToCloud)
  74. }
  75. }
  76. func HandleTpQModel(m mqtt.Message) {
  77. var obj protocol.Pack_IDObject
  78. var ret protocol.Pack_MutilFileObject
  79. if err := obj.DeCode(m.PayloadString()); err == nil {
  80. //读文件内容
  81. ReadMutilFileContent(protocol.TP_GW_MODEL, obj.Data.Id, &ret)
  82. if str, err := ret.EnCode(appConfig.GID, obj.Seq); err == nil {
  83. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_MODEL_ACK), str, 0, ToCloud)
  84. }
  85. }
  86. }
  87. func HandleTpWModel(m mqtt.Message) {
  88. var obj protocol.Pack_SeqFileObject
  89. var ret protocol.Pack_Ack
  90. err := obj.DeCode(m.PayloadString())
  91. if err == nil {
  92. err = HandleFile(protocol.TP_GW_SET_MODEL, &obj)
  93. }
  94. if str, err := ret.EnCode(appConfig.GID, appConfig.GID, obj.Seq, err); err == nil {
  95. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_MODEL_ACK), str, 0, ToCloud)
  96. }
  97. }
  98. func HandleTpQLog(m mqtt.Message) {
  99. var obj protocol.Pack_IDObject
  100. var ret protocol.Pack_MutilFileObject
  101. if err := obj.DeCode(m.PayloadString()); err == nil {
  102. //读文件内容
  103. ReadMutilFileContent(protocol.TP_GW_LOG, obj.Data.Id, &ret)
  104. if str, err := ret.EnCode(appConfig.GID, obj.Seq); err == nil {
  105. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_LOG_ACK), str, 0, ToCloud)
  106. }
  107. }
  108. }
  109. func HandleTpRLog(m mqtt.Message) {
  110. var obj protocol.Pack_IDObject
  111. var ret protocol.Pack_Ack
  112. var err error
  113. if err = obj.DeCode(m.PayloadString()); err == nil {
  114. rd, _ := ioutil.ReadDir(util.GetPath(3))
  115. for _, fi := range rd {
  116. if ok := strings.HasSuffix(fi.Name(), ".log"); ok {
  117. err = os.Remove(util.GetPath(3) + fi.Name())
  118. }
  119. }
  120. if str, err := ret.EnCode(appConfig.GID, appConfig.GID, obj.Seq, err); err == nil {
  121. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_REMOVE_LOG_ACK), str, 0, ToCloud)
  122. }
  123. }
  124. }
  125. func HandleTpQSys(m mqtt.Message) {
  126. var obj protocol.Pack_IDObject
  127. if err := obj.DeCode(m.PayloadString()); err == nil {
  128. go SysInfoStat(obj.Seq)
  129. }
  130. }
  131. type MqttOnline struct {
  132. }
  133. func (o *MqttOnline) GetOnlineMsg() (string, string) {
  134. //发布上线消息
  135. var obj protocol.Pack_IDObject
  136. str, err := obj.EnCode(appConfig.GID, GetNextUint64(), 0)
  137. if err != nil {
  138. return "", ""
  139. }
  140. return GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONLINE), str
  141. }
  142. func (o *MqttOnline) GetWillMsg() (string, string) {
  143. payload, _ := (&protocol.Pack_IDObject{}).EnCode(appConfig.GID, GetNextUint64(), 0) //遗嘱消息
  144. return GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_WILL), payload
  145. }
  146. // InitCloudMqttSubscribeTopics 初始化网关级别的主题订阅及路由
  147. func InitCloudMqttSubscribeTopics() {
  148. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_APP), mqtt.AtMostOnce, HandleTpQApp, ToCloud)
  149. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_APP), mqtt.AtMostOnce, HandleTpWApp, ToCloud)
  150. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SERIAL), mqtt.AtMostOnce, HandleTpQSerial, ToCloud)
  151. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_SERIAL), mqtt.AtMostOnce, HandleTpWSerial, ToCloud)
  152. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_RTU), mqtt.AtMostOnce, HandleTpQRtu, ToCloud)
  153. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_RTU), mqtt.AtMostOnce, HandleTpWRtu, ToCloud)
  154. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_MODEL), mqtt.AtMostOnce, HandleTpQModel, ToCloud)
  155. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SET_MODEL), mqtt.AtMostOnce, HandleTpWModel, ToCloud)
  156. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_LOG), mqtt.AtMostOnce, HandleTpQLog, ToCloud)
  157. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_REMOVE_LOG), mqtt.AtMostOnce, HandleTpRLog, ToCloud)
  158. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_SYS), mqtt.AtMostOnce, HandleTpQSys, ToCloud)
  159. }