| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- package main
- import (
- "io/ioutil"
- "os"
- "path/filepath"
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _gwHandlerOnce sync.Once
- var _gwHandlerSingle *GwHandler
- func GetGwHandler() *GwHandler {
- _gwHandlerOnce.Do(func() {
- _gwHandlerSingle = &GwHandler{
- queue: util.NewQueue(10000),
- }
- })
- return _gwHandlerSingle
- }
- type GwHandler struct {
- queue *util.MlQueue
- }
- func (o *GwHandler) SubscribeTopics() {
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONLINE), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_WILL), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_APP_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_APP_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_RTU_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_RTU_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_LOG_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_REMOVE_LOG_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SYS_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ITS_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONVIFDEV_ACK), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *GwHandler) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("GwHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *GwHandler) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("GwHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- Tenant, _, GID, topic, err := ParseTopic(m.Topic())
- if err != nil {
- continue
- }
- switch topic {
- case protocol.TP_GW_ONLINE: //上线
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线
- cacheState(obj.Id, obj.Time, 0)
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()})
- }
- case protocol.TP_GW_WILL: //下线
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线
- cacheState(obj.Id, obj.Time, 1)
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()})
- }
- case protocol.TP_GW_SET_APP_ACK, protocol.TP_GW_SET_MODEL_ACK, protocol.TP_GW_SET_RTU_ACK, protocol.TP_GW_SET_SERIAL_ACK, protocol.TP_GW_REMOVE_LOG_ACK:
- var obj protocol.Pack_Ack
- if err := obj.DeCode(m.PayloadString()); err == nil {
- o := models.DeviceCmdRecord{
- ID: obj.Seq,
- State: 1,
- Resp: obj.Data.Error,
- }
- if err := o.Update(); err != nil {
- logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]",
- obj.Id, obj.Seq, m.Topic(), err.Error())
- }
- }
- case protocol.TP_GW_APP:
- var ret protocol.Pack_MutilFileObject
- if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 {
- SaveFile(Tenant, GID, "conf", ret.Data.Files)
- var obj protocol.AppConfig
- if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil {
- o := models.Gateway{
- ID: ret.Id,
- Name: obj.Name,
- Tenant: obj.Tenant,
- Sn: obj.SN,
- Upgrade: obj.Upgrade,
- MqttEdgeServer: obj.Edge.Mqtt.Server,
- MqttEdgeUser: obj.Edge.Mqtt.User,
- MqttEdgePassword: obj.Edge.Mqtt.Password,
- MqttCloudServer: obj.Cloud.Mqtt.Server,
- MqttCloudUser: obj.Cloud.Mqtt.User,
- MqttCloudPassword: obj.Cloud.Mqtt.Password,
- State: 1,
- }
- if err := o.SaveFromGateway(); err != nil {
- logrus.Errorf("插入数据库失败:%s", err.Error())
- }
- }
- }
- case protocol.TP_GW_SERIAL_ACK:
- var ret protocol.Pack_MutilFileObject
- if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 {
- SaveFile(Tenant, GID, "conf", ret.Data.Files)
- var obj protocol.SerialConfig
- if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil {
- for _, v := range obj.Serial {
- o := models.GatewaySerial{
- ID: ret.Id,
- ComID: int(v.Code),
- Interface: v.Interface,
- Address: v.Address,
- BaudRate: v.BaudRate,
- DataBits: int(v.DataBits),
- StopBits: int(v.StopBits),
- Parity: v.Parity,
- Timeout: int(v.Timeout),
- ProtocolType: int(v.ProtocolType),
- }
- if err := models.G_db.Save(&o).Error; err != nil {
- logrus.Errorf("插入数据库失败:%s", err.Error())
- }
- }
- }
- }
- case protocol.TP_GW_RTU_ACK:
- var ret protocol.Pack_MutilFileObject
- if err := ret.DeCode(m.PayloadString()); err == nil {
- SaveFile(Tenant, GID, "dev", ret.Data.Files)
- for _, v := range ret.Data.Files {
- var obj protocol.MapDevConfig
- if err := json.UnmarshalFromString(v.Content, &obj); err == nil {
- for _, v1 := range obj.Rtu {
- o := models.GatewayDevice{
- ID: v1.DevCode,
- Name: v1.Name,
- GID: ret.Id,
- ComID: int(v1.Code),
- RtuID: int(v1.DevID),
- TID: int(v1.TID),
- SendCloud: v1.SendCloud,
- WaitTime: int(v1.WaitTime),
- ProtocolType: int(v1.ProtocolType),
- DevType: int(v1.DevType),
- Tenant: Tenant,
- State: 1,
- }
- if err := o.SaveFromGateway(); err != nil {
- logrus.Errorf("插入数据库失败:%s", err.Error())
- }
- //设备类型,1-灯控类设备 2-环境监测类设备 3-裕明485单灯控制器 4-液位计 5-路面状况传感器
- if v1.DevType == 4 || v1.DevType == 5 {
- if err := models.UpdateDeviceSensorTID(v1.DevCode, int(v1.TID)); err != nil {
- logrus.Errorf("更新传感器物模型失败:%s", err.Error())
- }
- } else if v1.DevType == 2 { //环境监测设备
- if err := models.UpdateDeviceEnvironmentTID(v1.DevCode, int(v1.TID)); err != nil {
- logrus.Errorf("更新环境传感器物模型失败:%s", err.Error())
- }
- } else if v1.DevType == 3 { //裕明单灯控制器
- if err := models.UpdateDeviceLampControllerTID(v1.DevCode, int(v1.TID)); err != nil {
- logrus.Errorf("更新灯控物模型失败:%s", err.Error())
- }
- } else if v1.DevType == 1 { //集控器
- if err := models.UpdateTID(v1.DevCode, int(v1.TID)); err != nil {
- logrus.Errorf("更新单灯集控器物模型失败:%s", err.Error())
- }
- }
- }
- }
- }
- }
- case protocol.TP_GW_MODEL_ACK:
- var ret protocol.Pack_MutilFileObject
- if err := ret.DeCode(m.PayloadString()); err == nil {
- SaveFile(Tenant, GID, "model", ret.Data.Files)
- }
- case protocol.TP_GW_LOG_ACK:
- var ret protocol.Pack_MutilFileObject
- if err := ret.DeCode(m.PayloadString()); err == nil {
- SaveFile(Tenant, GID, "log", ret.Data.Files)
- }
- case protocol.TP_GW_SYS_ACK:
- var ret protocol.Pack_SysInfo
- if err := ret.DeCode(m.PayloadString()); err == nil {
- o := models.GatewaySysInfo{
- GID: GID,
- AppName: ret.Data.Appinfo.Name,
- AppVersion: ret.Data.Appinfo.Version,
- CpuCnt: ret.Data.Cpuinfo.Cpus,
- CpuCores: ret.Data.Cpuinfo.Cores,
- CpuModelName: ret.Data.Cpuinfo.ModelName,
- CpuPercent: ret.Data.Cpuinfo.Percent,
- MemTotal: ret.Data.Meminfo.Total,
- MemAvailable: ret.Data.Meminfo.Available,
- MemUsed: ret.Data.Meminfo.Used,
- MemPercent: ret.Data.Meminfo.Percent,
- }
- if str, err := json.MarshalIndent(&ret.Data.Diskinfos, "", " "); err == nil {
- o.DiskInfos = string(str)
- }
- if str, err := json.MarshalIndent(&ret.Data.Ifs, "", " "); err == nil {
- o.NetIfs = string(str)
- }
- if str, err := json.MarshalIndent(&ret.Data.Pis, "", " "); err == nil {
- o.Process = string(str)
- }
- if str, err := json.MarshalIndent(&ret.Data.TcpListen, "", " "); err == nil {
- o.TcpListen = string(str)
- }
- if str, err := json.MarshalIndent(&ret.Data.TcpConn, "", " "); err == nil {
- o.TcpConn = string(str)
- }
- if str, err := json.MarshalIndent(&ret.Data.Udp, "", " "); err == nil {
- o.Udp = string(str)
- }
- if err := models.G_db.Save(&o).Error; err != nil {
- logrus.Errorf("插入数据库失败:%s", err.Error())
- }
- }
- case protocol.TP_GW_ITS_ACK:
- var ret protocol.Pack_ITSDev
- if err := ret.DeCode(m.PayloadString()); err == nil {
- for _, v := range ret.Data.Its {
- o := models.ItsDevice{
- ID: v.ID,
- Name: v.Name,
- GID: ret.Gid,
- Brand: v.Brand,
- Model: v.Model,
- DevType: v.DevType,
- User: v.User,
- Password: v.Password,
- IP: v.IP,
- Port: v.Port,
- HttpAddr: ret.Data.IPAddr,
- SuggestSpeed: int(ret.Data.SuggestSpeed),
- Duration: ret.Data.Duration,
- EnvID: ret.Data.EnvID,
- TollgateID: v.TollgateID,
- Tenant: Tenant,
- State: 1,
- }
- if err := o.SaveFromGateway(); err != nil {
- logrus.Errorf("抓拍单元数据入库失败:%s", err.Error())
- logrus.Errorf("抓拍单元数据入库失败:%v", o)
- }
- }
- }
- case protocol.TP_GW_ONVIFDEV_ACK:
- var ret protocol.Pack_OnvifDev
- if err := ret.DeCode(m.PayloadString()); err == nil {
- for _, v := range ret.Data {
- o := models.CameraDevice{
- ID: v.Code,
- Name: v.Name,
- GID: ret.Id,
- IP: v.IP,
- SN: v.SN,
- Brand: v.Brand,
- Model: v.Model,
- DevType: v.DevType,
- User: v.User,
- Password: v.Password,
- RtmpServer: v.RtmpServer,
- WebServer: v.WebServer,
- Event: v.Event,
- Gb28181: v.Gb28181,
- State: 1,
- }
- if err := o.SaveFromGateway(); err != nil {
- logrus.Errorf("摄像头数据入库失败:%s", err.Error())
- logrus.Errorf("摄像头数据入库失败:%v", o)
- }
- }
- }
- default:
- logrus.Warnf("GwHandler.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
- func SaveFile(tenant, GID, strType string, fo []protocol.FileObject) {
- Dir := tenant + string(filepath.Separator) + GID + string(filepath.Separator) + strType + string(filepath.Separator)
- err := os.MkdirAll(Dir, os.ModePerm)
- if err != nil {
- return
- }
- for _, v := range fo {
- if err := ioutil.WriteFile(Dir+v.File, []byte(v.Content), os.ModePerm); err != nil {
- logrus.Errorf("SaveFile:保存模型文件失败,文件名:%s,原因:%s", Dir+v.File, err.Error())
- }
- }
- }
|