package main import ( "io/ioutil" "os" "path/filepath" "runtime" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _gwHandlerOnce sync.Once var _gwHandlerSingle *GwHandler func GetGwHandler() *GwHandler { _gwHandlerOnce.Do(func() { _gwHandlerSingle = &GwHandler{ queue: util.NewQueue(10000), } }) return _gwHandlerSingle } type GwHandler struct { queue *util.MlQueue } func (o *GwHandler) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONLINE), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_WILL), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_APP_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_APP_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_SERIAL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_RTU_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_RTU_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SET_MODEL_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_LOG_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_REMOVE_LOG_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_SYS_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ITS_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_GATEWAY, protocol.TP_GW_ONVIFDEV_ACK), mqtt.AtMostOnce, o.HandlerData) } func (o *GwHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("GwHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *GwHandler) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("GwHandler.Handler:%v发生异常:%s", args, string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } Tenant, _, GID, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_GW_ONLINE: //上线 var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线 cacheState(obj.Id, obj.Time, 0) GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()}) } case protocol.TP_GW_WILL: //下线 var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线 cacheState(obj.Id, obj.Time, 1) GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()}) } case protocol.TP_GW_SET_APP_ACK, protocol.TP_GW_SET_MODEL_ACK, protocol.TP_GW_SET_RTU_ACK, protocol.TP_GW_SET_SERIAL_ACK, protocol.TP_GW_REMOVE_LOG_ACK: var obj protocol.Pack_Ack if err := obj.DeCode(m.PayloadString()); err == nil { o := models.DeviceCmdRecord{ ID: obj.Seq, State: 1, Resp: obj.Data.Error, } if err := o.Update(); err != nil { logrus.Errorf("收到网关[%s]的响应[seq:%d],主题:%s,但更新数据库失败[%s]", obj.Id, obj.Seq, m.Topic(), err.Error()) } } case protocol.TP_GW_APP: var ret protocol.Pack_MutilFileObject if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 { SaveFile(Tenant, GID, "conf", ret.Data.Files) var obj protocol.AppConfig if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil { o := models.Gateway{ ID: ret.Id, Name: obj.Name, Tenant: obj.Tenant, Sn: obj.SN, Upgrade: obj.Upgrade, MqttEdgeServer: obj.Edge.Mqtt.Server, MqttEdgeUser: obj.Edge.Mqtt.User, MqttEdgePassword: obj.Edge.Mqtt.Password, MqttCloudServer: obj.Cloud.Mqtt.Server, MqttCloudUser: obj.Cloud.Mqtt.User, MqttCloudPassword: obj.Cloud.Mqtt.Password, State: 1, } if err := o.SaveFromGateway(); err != nil { logrus.Errorf("插入数据库失败:%s", err.Error()) } } } case protocol.TP_GW_SERIAL_ACK: var ret protocol.Pack_MutilFileObject if err := ret.DeCode(m.PayloadString()); err == nil && len(ret.Data.Files) == 1 { SaveFile(Tenant, GID, "conf", ret.Data.Files) var obj protocol.SerialConfig if err := json.UnmarshalFromString(ret.Data.Files[0].Content, &obj); err == nil { for _, v := range obj.Serial { o := models.GatewaySerial{ ID: ret.Id, ComID: int(v.Code), Interface: v.Interface, Address: v.Address, BaudRate: v.BaudRate, DataBits: int(v.DataBits), StopBits: int(v.StopBits), Parity: v.Parity, Timeout: int(v.Timeout), ProtocolType: int(v.ProtocolType), } if err := models.G_db.Save(&o).Error; err != nil { logrus.Errorf("插入数据库失败:%s", err.Error()) } } } } case protocol.TP_GW_RTU_ACK: var ret protocol.Pack_MutilFileObject if err := ret.DeCode(m.PayloadString()); err == nil { SaveFile(Tenant, GID, "dev", ret.Data.Files) for _, v := range ret.Data.Files { var obj protocol.MapDevConfig if err := json.UnmarshalFromString(v.Content, &obj); err == nil { for _, v1 := range obj.Rtu { o := models.GatewayDevice{ ID: v1.DevCode, Name: v1.Name, GID: ret.Id, ComID: int(v1.Code), RtuID: int(v1.DevID), TID: int(v1.TID), SendCloud: v1.SendCloud, WaitTime: int(v1.WaitTime), ProtocolType: int(v1.ProtocolType), DevType: int(v1.DevType), Tenant: Tenant, State: 1, } if err := o.SaveFromGateway(); err != nil { logrus.Errorf("插入数据库失败:%s", err.Error()) } //设备类型,1-灯控类设备 2-环境监测类设备 3-裕明485单灯控制器 4-液位计 5-路面状况传感器 if v1.DevType == 4 || v1.DevType == 5 { if err := models.UpdateDeviceSensorTID(v1.DevCode, int(v1.TID)); err != nil { logrus.Errorf("更新传感器物模型失败:%s", err.Error()) } } else if v1.DevType == 2 { //环境监测设备 if err := models.UpdateDeviceEnvironmentTID(v1.DevCode, int(v1.TID)); err != nil { logrus.Errorf("更新环境传感器物模型失败:%s", err.Error()) } } else if v1.DevType == 3 { //裕明单灯控制器 if err := models.UpdateDeviceLampControllerTID(v1.DevCode, int(v1.TID)); err != nil { logrus.Errorf("更新灯控物模型失败:%s", err.Error()) } } else if v1.DevType == 1 { //集控器 if err := models.UpdateTID(v1.DevCode, int(v1.TID)); err != nil { logrus.Errorf("更新单灯集控器物模型失败:%s", err.Error()) } } } } } } case protocol.TP_GW_MODEL_ACK: var ret protocol.Pack_MutilFileObject if err := ret.DeCode(m.PayloadString()); err == nil { SaveFile(Tenant, GID, "model", ret.Data.Files) } case protocol.TP_GW_LOG_ACK: var ret protocol.Pack_MutilFileObject if err := ret.DeCode(m.PayloadString()); err == nil { SaveFile(Tenant, GID, "log", ret.Data.Files) } case protocol.TP_GW_SYS_ACK: var ret protocol.Pack_SysInfo if err := ret.DeCode(m.PayloadString()); err == nil { o := models.GatewaySysInfo{ GID: GID, AppName: ret.Data.Appinfo.Name, AppVersion: ret.Data.Appinfo.Version, CpuCnt: ret.Data.Cpuinfo.Cpus, CpuCores: ret.Data.Cpuinfo.Cores, CpuModelName: ret.Data.Cpuinfo.ModelName, CpuPercent: ret.Data.Cpuinfo.Percent, MemTotal: ret.Data.Meminfo.Total, MemAvailable: ret.Data.Meminfo.Available, MemUsed: ret.Data.Meminfo.Used, MemPercent: ret.Data.Meminfo.Percent, } if str, err := json.MarshalIndent(&ret.Data.Diskinfos, "", " "); err == nil { o.DiskInfos = string(str) } if str, err := json.MarshalIndent(&ret.Data.Ifs, "", " "); err == nil { o.NetIfs = string(str) } if str, err := json.MarshalIndent(&ret.Data.Pis, "", " "); err == nil { o.Process = string(str) } if str, err := json.MarshalIndent(&ret.Data.TcpListen, "", " "); err == nil { o.TcpListen = string(str) } if str, err := json.MarshalIndent(&ret.Data.TcpConn, "", " "); err == nil { o.TcpConn = string(str) } if str, err := json.MarshalIndent(&ret.Data.Udp, "", " "); err == nil { o.Udp = string(str) } if err := models.G_db.Save(&o).Error; err != nil { logrus.Errorf("插入数据库失败:%s", err.Error()) } } case protocol.TP_GW_ITS_ACK: var ret protocol.Pack_ITSDev if err := ret.DeCode(m.PayloadString()); err == nil { for _, v := range ret.Data.Its { o := models.ItsDevice{ ID: v.ID, Name: v.Name, GID: ret.Gid, Brand: v.Brand, Model: v.Model, DevType: v.DevType, User: v.User, Password: v.Password, IP: v.IP, Port: v.Port, HttpAddr: ret.Data.IPAddr, SuggestSpeed: int(ret.Data.SuggestSpeed), Duration: ret.Data.Duration, EnvID: ret.Data.EnvID, TollgateID: v.TollgateID, Tenant: Tenant, State: 1, } if err := o.SaveFromGateway(); err != nil { logrus.Errorf("抓拍单元数据入库失败:%s", err.Error()) logrus.Errorf("抓拍单元数据入库失败:%v", o) } } } case protocol.TP_GW_ONVIFDEV_ACK: var ret protocol.Pack_OnvifDev if err := ret.DeCode(m.PayloadString()); err == nil { for _, v := range ret.Data { o := models.CameraDevice{ ID: v.Code, Name: v.Name, GID: ret.Id, IP: v.IP, SN: v.SN, Brand: v.Brand, Model: v.Model, DevType: v.DevType, User: v.User, Password: v.Password, RtmpServer: v.RtmpServer, WebServer: v.WebServer, Event: v.Event, Gb28181: v.Gb28181, State: 1, } if err := o.SaveFromGateway(); err != nil { logrus.Errorf("摄像头数据入库失败:%s", err.Error()) logrus.Errorf("摄像头数据入库失败:%v", o) } } } default: logrus.Warnf("GwHandler.Handler:收到暂不支持的主题:%s", topic) } } } func SaveFile(tenant, GID, strType string, fo []protocol.FileObject) { Dir := tenant + string(filepath.Separator) + GID + string(filepath.Separator) + strType + string(filepath.Separator) err := os.MkdirAll(Dir, os.ModePerm) if err != nil { return } for _, v := range fo { if err := ioutil.WriteFile(Dir+v.File, []byte(v.Content), os.ModePerm); err != nil { logrus.Errorf("SaveFile:保存模型文件失败,文件名:%s,原因:%s", Dir+v.File, err.Error()) } } }