gwhandler.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package main
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "path/filepath"
  6. "runtime"
  7. "runtime/debug"
  8. "sync"
  9. "time"
  10. "github.com/sirupsen/logrus"
  11. "lc/common/models"
  12. "lc/common/mqtt"
  13. "lc/common/protocol"
  14. "lc/common/util"
  15. )
  16. var _gwHandlerOnce sync.Once
  17. var _gwHandlerSingle *GwHandler
  18. func GetGwHandler() *GwHandler {
  19. _gwHandlerOnce.Do(func() {
  20. _gwHandlerSingle = &GwHandler{
  21. queue: util.NewQueue(10000),
  22. }
  23. })
  24. return _gwHandlerSingle
  25. }
  26. type GwHandler struct {
  27. queue *util.MlQueue
  28. }
  29. func (o *GwHandler) SubscribeTopics() {
  30. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONLINE), mqtt.AtMostOnce, o.HandlerData)
  31. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_WILL), mqtt.AtMostOnce, o.HandlerData)
  32. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_APP_ACK), mqtt.AtMostOnce, o.HandlerData)
  33. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_APP_ACK), mqtt.AtMostOnce, o.HandlerData)
  34. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData)
  35. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData)
  36. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_RTU_ACK), mqtt.AtMostOnce, o.HandlerData)
  37. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_RTU_ACK), mqtt.AtMostOnce, o.HandlerData)
  38. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData)
  39. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData)
  40. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_LOG_ACK), mqtt.AtMostOnce, o.HandlerData)
  41. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_REMOVE_LOG_ACK), mqtt.AtMostOnce, o.HandlerData)
  42. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SYS_ACK), mqtt.AtMostOnce, o.HandlerData)
  43. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ITS_ACK), mqtt.AtMostOnce, o.HandlerData)
  44. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONVIFDEV_ACK), mqtt.AtMostOnce, o.HandlerData)
  45. }
  46. func (o *GwHandler) HandlerData(m mqtt.Message) {
  47. for {
  48. ok, cnt := o.queue.Put(&m)
  49. if ok {
  50. break
  51. } else {
  52. logrus.Errorf("GwHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  53. runtime.Gosched()
  54. }
  55. }
  56. }
  57. func (o *GwHandler) Handler(args ...interface{}) interface{} {
  58. defer func() {
  59. if err := recover(); err != nil {
  60. gopool.Add(o.Handler, args)
  61. logrus.Errorf("GwHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
  62. }
  63. }()
  64. for {
  65. msg, ok, quantity := o.queue.Get()
  66. if !ok {
  67. time.Sleep(10 * time.Millisecond)
  68. continue
  69. } else if quantity > 1000 {
  70. logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  71. }
  72. m, ok := msg.(*mqtt.Message)
  73. if !ok {
  74. continue
  75. }
  76. Tenant, _, GID, topic, err := ParseTopic(m.Topic())
  77. if err != nil {
  78. continue
  79. }
  80. switch topic {
  81. case protocol.TP_GW_ONLINE: //上线
  82. var obj protocol.Pack_IDObject
  83. if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线
  84. cacheState(obj.Id, obj.Time, 0)
  85. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()})
  86. }
  87. case protocol.TP_GW_WILL: //下线
  88. var obj protocol.Pack_IDObject
  89. if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线
  90. cacheState(obj.Id, obj.Time, 1)
  91. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()})
  92. }
  93. case protocol.TP_GW_SET_APP_ACK, protocol.TP_GW_SET_MODEL_ACK, protocol.TP_GW_SET_RTU_ACK, protocol.TP_GW_SET_SERIAL_ACK, protocol.TP_GW_REMOVE_LOG_ACK:
  94. var obj protocol.Pack_Ack
  95. if err := obj.DeCode(m.PayloadString()); err == nil {
  96. o := models.DeviceCmdRecord{
  97. ID: obj.Seq,
  98. State: 1,
  99. Resp: obj.Data.Error,
  100. }
  101. if err := o.Update(); err != nil {
  102. logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
  103. obj.Id, obj.Seq, m.Topic(), err.Error())
  104. }
  105. }
  106. case protocol.TP_GW_APP:
  107. var ret protocol.Pack_MutilFileObject
  108. if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 {
  109. SaveFile(Tenant, GID, "conf", ret.Data.Files)
  110. var obj protocol.AppConfig
  111. if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil {
  112. o := models.Gateway{
  113. ID: ret.Id,
  114. Name: obj.Name,
  115. Tenant: obj.Tenant,
  116. Sn: obj.SN,
  117. Upgrade: obj.Upgrade,
  118. MqttEdgeServer: obj.Edge.Mqtt.Server,
  119. MqttEdgeUser: obj.Edge.Mqtt.User,
  120. MqttEdgePassword: obj.Edge.Mqtt.Password,
  121. MqttCloudServer: obj.Cloud.Mqtt.Server,
  122. MqttCloudUser: obj.Cloud.Mqtt.User,
  123. MqttCloudPassword: obj.Cloud.Mqtt.Password,
  124. State: 1,
  125. }
  126. if err := o.SaveFromGateway(); err != nil {
  127. logrus.Errorf("插入数据库失败:%s", err.Error())
  128. }
  129. }
  130. }
  131. case protocol.TP_GW_SERIAL_ACK:
  132. var ret protocol.Pack_MutilFileObject
  133. if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 {
  134. SaveFile(Tenant, GID, "conf", ret.Data.Files)
  135. var obj protocol.SerialConfig
  136. if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil {
  137. for _, v := range obj.Serial {
  138. o := models.GatewaySerial{
  139. ID: ret.Id,
  140. ComID: int(v.Code),
  141. Interface: v.Interface,
  142. Address: v.Address,
  143. BaudRate: v.BaudRate,
  144. DataBits: int(v.DataBits),
  145. StopBits: int(v.StopBits),
  146. Parity: v.Parity,
  147. Timeout: int(v.Timeout),
  148. ProtocolType: int(v.ProtocolType),
  149. }
  150. if err := models.G_db.Save(&o).Error; err != nil {
  151. logrus.Errorf("插入数据库失败:%s", err.Error())
  152. }
  153. }
  154. }
  155. }
  156. case protocol.TP_GW_RTU_ACK:
  157. var ret protocol.Pack_MutilFileObject
  158. if err := ret.DeCode(m.PayloadString()); err == nil {
  159. SaveFile(Tenant, GID, "dev", ret.Data.Files)
  160. for _, v := range ret.Data.Files {
  161. var obj protocol.MapDevConfig
  162. if err := json.UnmarshalFromString(v.Content, &obj); err == nil {
  163. for _, v1 := range obj.Rtu {
  164. o := models.GatewayDevice{
  165. ID: v1.DevCode,
  166. Name: v1.Name,
  167. GID: ret.Id,
  168. ComID: int(v1.Code),
  169. RtuID: int(v1.DevID),
  170. TID: int(v1.TID),
  171. SendCloud: v1.SendCloud,
  172. WaitTime: int(v1.WaitTime),
  173. ProtocolType: int(v1.ProtocolType),
  174. DevType: int(v1.DevType),
  175. Tenant: Tenant,
  176. State: 1,
  177. }
  178. if err := o.SaveFromGateway(); err != nil {
  179. logrus.Errorf("插入数据库失败:%s", err.Error())
  180. }
  181. //设备类型,1-灯控类设备 2-环境监测类设备 3-裕明485单灯控制器 4-液位计 5-路面状况传感器
  182. if v1.DevType == 4 || v1.DevType == 5 {
  183. if err := models.UpdateDeviceSensorTID(v1.DevCode, int(v1.TID)); err != nil {
  184. logrus.Errorf("更新传感器物模型失败:%s", err.Error())
  185. }
  186. } else if v1.DevType == 2 { //环境监测设备
  187. if err := models.UpdateDeviceEnvironmentTID(v1.DevCode, int(v1.TID)); err != nil {
  188. logrus.Errorf("更新环境传感器物模型失败:%s", err.Error())
  189. }
  190. } else if v1.DevType == 3 { //裕明单灯控制器
  191. if err := models.UpdateDeviceLampControllerTID(v1.DevCode, int(v1.TID)); err != nil {
  192. logrus.Errorf("更新灯控物模型失败:%s", err.Error())
  193. }
  194. } else if v1.DevType == 1 { //集控器
  195. if err := models.UpdateTID(v1.DevCode, int(v1.TID)); err != nil {
  196. logrus.Errorf("更新单灯集控器物模型失败:%s", err.Error())
  197. }
  198. }
  199. }
  200. }
  201. }
  202. }
  203. case protocol.TP_GW_MODEL_ACK:
  204. var ret protocol.Pack_MutilFileObject
  205. if err := ret.DeCode(m.PayloadString()); err == nil {
  206. SaveFile(Tenant, GID, "model", ret.Data.Files)
  207. }
  208. case protocol.TP_GW_LOG_ACK:
  209. var ret protocol.Pack_MutilFileObject
  210. if err := ret.DeCode(m.PayloadString()); err == nil {
  211. SaveFile(Tenant, GID, "log", ret.Data.Files)
  212. }
  213. case protocol.TP_GW_SYS_ACK:
  214. var ret protocol.Pack_SysInfo
  215. if err := ret.DeCode(m.PayloadString()); err == nil {
  216. o := models.GatewaySysInfo{
  217. GID: GID,
  218. AppName: ret.Data.Appinfo.Name,
  219. AppVersion: ret.Data.Appinfo.Version,
  220. CpuCnt: ret.Data.Cpuinfo.Cpus,
  221. CpuCores: ret.Data.Cpuinfo.Cores,
  222. CpuModelName: ret.Data.Cpuinfo.ModelName,
  223. CpuPercent: ret.Data.Cpuinfo.Percent,
  224. MemTotal: ret.Data.Meminfo.Total,
  225. MemAvailable: ret.Data.Meminfo.Available,
  226. MemUsed: ret.Data.Meminfo.Used,
  227. MemPercent: ret.Data.Meminfo.Percent,
  228. }
  229. if str, err := json.MarshalIndent(&ret.Data.Diskinfos, "", " "); err == nil {
  230. o.DiskInfos = string(str)
  231. }
  232. if str, err := json.MarshalIndent(&ret.Data.Ifs, "", " "); err == nil {
  233. o.NetIfs = string(str)
  234. }
  235. if str, err := json.MarshalIndent(&ret.Data.Pis, "", " "); err == nil {
  236. o.Process = string(str)
  237. }
  238. if str, err := json.MarshalIndent(&ret.Data.TcpListen, "", " "); err == nil {
  239. o.TcpListen = string(str)
  240. }
  241. if str, err := json.MarshalIndent(&ret.Data.TcpConn, "", " "); err == nil {
  242. o.TcpConn = string(str)
  243. }
  244. if str, err := json.MarshalIndent(&ret.Data.Udp, "", " "); err == nil {
  245. o.Udp = string(str)
  246. }
  247. if err := models.G_db.Save(&o).Error; err != nil {
  248. logrus.Errorf("插入数据库失败:%s", err.Error())
  249. }
  250. }
  251. case protocol.TP_GW_ITS_ACK:
  252. var ret protocol.Pack_ITSDev
  253. if err := ret.DeCode(m.PayloadString()); err == nil {
  254. for _, v := range ret.Data.Its {
  255. o := models.ItsDevice{
  256. ID: v.ID,
  257. Name: v.Name,
  258. GID: ret.Gid,
  259. Brand: v.Brand,
  260. Model: v.Model,
  261. DevType: v.DevType,
  262. User: v.User,
  263. Password: v.Password,
  264. IP: v.IP,
  265. Port: v.Port,
  266. HttpAddr: ret.Data.IPAddr,
  267. SuggestSpeed: int(ret.Data.SuggestSpeed),
  268. Duration: ret.Data.Duration,
  269. EnvID: ret.Data.EnvID,
  270. TollgateID: v.TollgateID,
  271. Tenant: Tenant,
  272. State: 1,
  273. }
  274. if err := o.SaveFromGateway(); err != nil {
  275. logrus.Errorf("抓拍单元数据入库失败:%s", err.Error())
  276. logrus.Errorf("抓拍单元数据入库失败:%v", o)
  277. }
  278. }
  279. }
  280. case protocol.TP_GW_ONVIFDEV_ACK:
  281. var ret protocol.Pack_OnvifDev
  282. if err := ret.DeCode(m.PayloadString()); err == nil {
  283. for _, v := range ret.Data {
  284. o := models.CameraDevice{
  285. ID: v.Code,
  286. Name: v.Name,
  287. GID: ret.Id,
  288. IP: v.IP,
  289. SN: v.SN,
  290. Brand: v.Brand,
  291. Model: v.Model,
  292. DevType: v.DevType,
  293. User: v.User,
  294. Password: v.Password,
  295. RtmpServer: v.RtmpServer,
  296. WebServer: v.WebServer,
  297. Event: v.Event,
  298. Gb28181: v.Gb28181,
  299. State: 1,
  300. }
  301. if err := o.SaveFromGateway(); err != nil {
  302. logrus.Errorf("摄像头数据入库失败:%s", err.Error())
  303. logrus.Errorf("摄像头数据入库失败:%v", o)
  304. }
  305. }
  306. }
  307. default:
  308. logrus.Warnf("GwHandler.Handler:收到暂不支持的主题:%s", topic)
  309. }
  310. }
  311. }
  312. func SaveFile(tenant, GID, strType string, fo []protocol.FileObject) {
  313. Dir := tenant + string(filepath.Separator) + GID + string(filepath.Separator) + strType + string(filepath.Separator)
  314. err := os.MkdirAll(Dir, os.ModePerm)
  315. if err != nil {
  316. return
  317. }
  318. for _, v := range fo {
  319. if err := ioutil.WriteFile(Dir+v.File, []byte(v.Content), os.ModePerm); err != nil {
  320. logrus.Errorf("SaveFile:保存模型文件失败,文件名:%s,原因:%s", Dir+v.File, err.Error())
  321. }
  322. }
  323. }