package main import ( "context" "os" "runtime/debug" "strings" "time" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/onvif/profiles/devicemgmt" "lc/common/onvif/soap" "lc/common/protocol" "lc/common/util" ) type Resolution struct { Width int32 Height int32 } type OnvifState struct { ChangeTime time.Time //变动时间 State uint8 //是否在线,0在线,1离线 } func (o *OnvifState) CheckOnline(parent *LcDevice) { if parent.XAddr == "" { return } o.State = parent.testOnline(parent.XAddr) o.ChangeTime = util.MlNow() var obj protocol.Pack_IPCState if str, err := obj.EnCode(parent.onvifDev.Code, appConfig.GID, GetNextUint64(), o.ChangeTime, o.State); err == nil { GetMQTTMgr().Publish(GetTopic(parent.GetDevType(), parent.onvifDev.Code, protocol.TP_ONVIF_STATE), str, mqtt.AtMostOnce, ToAll) } } type VideoToken struct { Token string // "Profile_1" Name string //码流名称,如"mainStream" VideoEncoding string //编码格式,如"H264" VideoResolution Resolution //分辨率 VideoFrameRateLimit int32 //25 VideoBitrateLimit int32 //2048 VideoRtspUrl string //视频地址 SnapshotUrl string //抓拍图片地址 } type LcDevice struct { IP string //设备ID,uuid endpoints map[string]string tokens map[string]*VideoToken client *soap.Client onvifDev *protocol.OnvifDev ctx context.Context cancel context.CancelFunc downQueue *util.MlQueue mapTopicHandle map[string]func(m mqtt.Message) chanXAddr chan string XAddr string ffmpeg *os.Process //推流进程 observer uint32 EventActuator *EventActuator State OnvifState //在线监测 } func NewLcDevice(dev *protocol.OnvifDev) *LcDevice { client := soap.NewClient(soap.WithTimeout(time.Second * 0)) client.AddHeader(soap.NewWSSSecurityHeader(dev.User, dev.Password)) ctx, cancel := context.WithCancel(context.Background()) lcdev := LcDevice{ IP: dev.IP, endpoints: make(map[string]string), tokens: make(map[string]*VideoToken), client: client, onvifDev: dev, ctx: ctx, cancel: cancel, downQueue: util.NewQueue(100), mapTopicHandle: make(map[string]func(m mqtt.Message)), chanXAddr: make(chan string), } lcdev.SetTopicHandle() lcdev.MQTTSubscribe() go lcdev.Handle() return &lcdev } func (o *LcDevice) GetDevType() string { if o.onvifDev.DevType == 3 { return protocol.DT_IPC } else if o.onvifDev.DevType == 4 { return protocol.DT_SOS } return "unknown" } func (o *LcDevice) SetTopicHandle() { o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ)] = o.HandleTpOnvifPtz o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM)] = o.HandleTpSPtzComm o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT)] = o.HandleTpOnvifSnapshot o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD)] = o.HandleTpOnvifRecord o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO)] = o.HandleTpOnvifVideo o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT)] = o.HandleTpOnvifReboot o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME)] = o.HandleTpOnvifSynctime } func (o *LcDevice) MQTTSubscribe() { GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO), mqtt.AtMostOnce, o.HandleMessage, ToAll) //以下仅订阅云端 GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT), mqtt.AtMostOnce, o.HandleMessage, ToCloud) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME), mqtt.AtMostOnce, o.HandleMessage, ToCloud) } func (o *LcDevice) GetCapabilities(XAddr string) error { dev := devicemgmt.NewDevice(o.client, XAddr) reply, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{}) if err != nil { if serr, ok := err.(*soap.SOAPFault); ok { logrus.Error(serr) } return err } if len(strings.Trim(string(reply.Capabilities.Analytics.XAddr), " ")) > 0 { o.endpoints["analytics"] = string(reply.Capabilities.Analytics.XAddr) } if len(strings.Trim(string(reply.Capabilities.Device.XAddr), " ")) > 0 { o.endpoints["device"] = string(reply.Capabilities.Device.XAddr) } if len(strings.Trim(string(reply.Capabilities.Events.XAddr), " ")) > 0 { o.endpoints["events"] = string(reply.Capabilities.Events.XAddr) } if len(strings.Trim(string(reply.Capabilities.Imaging.XAddr), " ")) > 0 { o.endpoints["imaging"] = string(reply.Capabilities.Imaging.XAddr) } if len(strings.Trim(string(reply.Capabilities.Media.XAddr), " ")) > 0 { o.endpoints["media"] = string(reply.Capabilities.Media.XAddr) } if len(strings.Trim(string(reply.Capabilities.PTZ.XAddr), " ")) > 0 { o.endpoints["ptz"] = string(reply.Capabilities.PTZ.XAddr) } return nil } func (o *LcDevice) testOnline(XAddr string) uint8 { dev := devicemgmt.NewDevice(o.client, XAddr) _, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{}) if err != nil { return protocol.FAILED } return protocol.SUCCESS } func (o *LcDevice) SystemReboot() (string, error) { dev := devicemgmt.NewDevice(o.client, o.XAddr) resp, err := dev.SystemReboot(&devicemgmt.SystemReboot{}) if err != nil { logrus.Errorf("重启设备[%s]发生错误:%s", o.onvifDev.Code, err.Error()) return "", err } return resp.Message, nil } func (o *LcDevice) SetSystemDateAndTime() error { dev := devicemgmt.NewDevice(o.client, o.XAddr) _, err := dev.SetSystemDateAndTime(&devicemgmt.SetSystemDateAndTime{ DateTimeType: devicemgmt.SetDateTimeTypeNTP, DaylightSavings: true, }) if err != nil { logrus.Errorf("设置设备[%s]日期时间发生错误:%s", o.onvifDev.Code, err.Error()) } return err } func (o *LcDevice) HandleMessage(m mqtt.Message) { o.downQueue.Put(m) } func (o *LcDevice) HandleTpOnvifReboot(m mqtt.Message) { var obj protocol.Pack_IPCCtrlComm if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } str, err := o.SystemReboot() if err == nil { logrus.Infof("设备[%s]重启成功:%s", o.onvifDev.Code, str) } var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT_ACK), strRet, mqtt.AtMostOnce, ToAll) } } func (o *LcDevice) HandleTpOnvifSynctime(m mqtt.Message) { var obj protocol.Pack_IPCCtrlComm if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } err := o.SetSystemDateAndTime() var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME_ACK), strRet, mqtt.AtMostOnce, ToAll) } } func (o *LcDevice) Update(xaddr string) { if xaddr != "" { o.chanXAddr <- xaddr } } func (o *LcDevice) init(xaddr string) error { //获取服务能力,存入endpoint err := o.GetCapabilities(xaddr) if err != nil { return err } //获取profile,存入videotoken err = o.GetProfiles() if err != nil { return err } //获取uri,存入videotoken //err = o.GetAllStreamUri() //if err != nil { // return err //} //err = o.GetAllSnapshotUri() //if err != nil { // return err //} if o.EventActuator != nil { o.EventActuator.Cancel() if addr, ok := o.endpoints["events"]; ok { o.EventActuator = NewEventActuator(o) go o.EventActuator.EventHandle(addr) } } return nil } func (o *LcDevice) Handle() { defer func() { if err := recover(); err != nil { logrus.Errorf("LcDevice.Handle发生异常:%v", err) logrus.Errorf("LcDevice.Handle发生异常,堆栈信息:%s", string(debug.Stack())) go o.Handle() } }() timer := time.NewTicker(1 * time.Minute) NeedRetry := true //初始化地址 o.XAddr = "http://" + o.IP + ":80/onvif/device_service" if err := o.init(o.XAddr); err == nil { NeedRetry = false } else { logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error()) } for { select { case <-o.ctx.Done(): o.CleanupProcess() if o.EventActuator != nil { o.EventActuator.Cancel() } logrus.Errorf("设备[%s]的LcDevice.Handle退出,原因:%v", o.onvifDev.Code, o.ctx.Err()) return case <-timer.C: //每隔1分钟执行一次 if NeedRetry { if err := o.init(o.XAddr); err == nil { NeedRetry = false } else { logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error()) } } o.State.CheckOnline(o) case xaddr := <-o.chanXAddr: logrus.Infof("摄像机%s尝试更新ip地址为:%s", o.onvifDev.Code, xaddr) if xaddr != o.XAddr { o.XAddr = xaddr if err := o.init(xaddr); err == nil { NeedRetry = false logrus.Infof("摄像机%s初始化成功", o.onvifDev.Code) } else { logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error()) } } default: if o.EventActuator == nil && len(o.onvifDev.Event) > 0 { if addr, ok := o.endpoints["events"]; ok { o.EventActuator = NewEventActuator(o) go o.EventActuator.EventHandle(addr) } } //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } else { logrus.Errorf("LcDevice.Handle:不支持的主题:%s", mm.Topic()) } } } else { time.Sleep(50 * time.Millisecond) } } } } func FileExist(path string) bool { _, err := os.Lstat(path) return !os.IsNotExist(err) } func GetOnlineMsg() (string, string) { //发布上线消息 return "", "" } func GetWillMsg() (string, string) { return "", "" }