package main import ( "context" "os" "strings" "time" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/onvif/profiles/devicemgmt" "lc/common/onvif/soap" "lc/common/protocol" "lc/common/util" ) type Resolution struct { Width int32 Height int32 } type OnvifState struct { ChangeTime time.Time //变动时间 State uint8 //是否在线,0在线,1离线 } func (o *OnvifState) CheckOnline(parent *LcDevice) { if parent.XAddr == "" { return } o.State = parent.testOnline(parent.XAddr) o.ChangeTime = util.MlNow() var obj protocol.Pack_IPCState if str, err := obj.EnCode(parent.onvifDev.Code, appConfig.GID, GetNextUint64(), o.ChangeTime, o.State); err == nil { GetMQTTMgr().Publish(GetTopic(parent.GetDevType(), parent.onvifDev.Code, protocol.TP_ONVIF_STATE), str, mqtt.AtMostOnce, ToAll) } } type VideoToken struct { Token string // "Profile_1" Name string //码流名称,如"mainStream" VideoEncoding string //编码格式,如"H264" VideoResolution Resolution //分辨率 VideoFrameRateLimit int32 //25 VideoBitrateLimit int32 //2048 VideoRtspUrl string //视频地址 SnapshotUrl string //抓拍图片地址 } type LcDevice struct { IP string //设备ID,uuid endpoints map[string]string tokens map[string]*VideoToken client *soap.Client onvifDev *protocol.OnvifDev ctx context.Context cancel context.CancelFunc downQueue *util.MlQueue mapTopicHandle map[string]func(m mqtt.Message) chanXAddr chan string XAddr string ffmpeg *os.Process //推流进程 observer uint32 EventActuator *EventActuator State OnvifState //在线监测 } func NewLcDevice(dev *protocol.OnvifDev) *LcDevice { client := soap.NewClient(soap.WithTimeout(time.Second * 0)) client.AddHeader(soap.NewWSSSecurityHeader(dev.User, dev.Password)) ctx, cancel := context.WithCancel(context.Background()) lcDevice := LcDevice{ IP: dev.IP, endpoints: make(map[string]string), tokens: make(map[string]*VideoToken), client: client, onvifDev: dev, ctx: ctx, cancel: cancel, downQueue: util.NewQueue(100), mapTopicHandle: make(map[string]func(m mqtt.Message)), chanXAddr: make(chan string), } lcDevice.SetTopicHandle() lcDevice.MQTTSubscribe() go lcDevice.Handle() return &lcDevice } func (o *LcDevice) GetDevType() string { if o.onvifDev.DevType == 3 { return protocol.DT_IPC } else if o.onvifDev.DevType == 4 { return protocol.DT_SOS } return "unknown" } func (o *LcDevice) SetTopicHandle() { o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ)] = o.HandleTpOnvifPtz o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM)] = o.HandleTpSPtzComm o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT)] = o.HandleTpOnvifSnapshot o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD)] = o.HandleTpOnvifRecord o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO)] = o.HandleTpOnvifVideo o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT)] = o.HandleTpOnvifReboot o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME)] = o.HandleTpOnvifSynctime } func (o *LcDevice) MQTTSubscribe() { GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD), mqtt.AtMostOnce, o.HandleMessage, ToAll) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO), mqtt.AtMostOnce, o.HandleMessage, ToAll) //以下仅订阅云端 GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT), mqtt.AtMostOnce, o.HandleMessage, ToCloud) GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME), mqtt.AtMostOnce, o.HandleMessage, ToCloud) } func (o *LcDevice) GetCapabilities(XAddr string) error { dev := devicemgmt.NewDevice(o.client, XAddr) reply, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{}) if err != nil { if soapFault, ok := err.(*soap.SOAPFault); ok { logrus.Error(soapFault) } return err } if len(strings.Trim(string(reply.Capabilities.Analytics.XAddr), " ")) > 0 { o.endpoints["analytics"] = string(reply.Capabilities.Analytics.XAddr) } if len(strings.Trim(string(reply.Capabilities.Device.XAddr), " ")) > 0 { o.endpoints["device"] = string(reply.Capabilities.Device.XAddr) } if len(strings.Trim(string(reply.Capabilities.Events.XAddr), " ")) > 0 { o.endpoints["events"] = string(reply.Capabilities.Events.XAddr) } if len(strings.Trim(string(reply.Capabilities.Imaging.XAddr), " ")) > 0 { o.endpoints["imaging"] = string(reply.Capabilities.Imaging.XAddr) } if len(strings.Trim(string(reply.Capabilities.Media.XAddr), " ")) > 0 { o.endpoints["media"] = string(reply.Capabilities.Media.XAddr) } if len(strings.Trim(string(reply.Capabilities.PTZ.XAddr), " ")) > 0 { o.endpoints["ptz"] = string(reply.Capabilities.PTZ.XAddr) } return nil } func (o *LcDevice) testOnline(XAddr string) uint8 { dev := devicemgmt.NewDevice(o.client, XAddr) _, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{}) if err != nil { return protocol.FAILED } return protocol.SUCCESS } func (o *LcDevice) SystemReboot() (string, error) { dev := devicemgmt.NewDevice(o.client, o.XAddr) resp, err := dev.SystemReboot(&devicemgmt.SystemReboot{}) if err != nil { return "", err } return resp.Message, nil } func (o *LcDevice) SetSystemDateAndTime() error { dev := devicemgmt.NewDevice(o.client, o.XAddr) dev.SetSystemDateAndTime(&devicemgmt.SetSystemDateAndTime{ DateTimeType: devicemgmt.SetDateTimeTypeNTP, DaylightSavings: true, }) return nil } func (o *LcDevice) HandleMessage(m mqtt.Message) { o.downQueue.Put(m) } func (o *LcDevice) HandleTpOnvifReboot(m mqtt.Message) { var obj protocol.Pack_IPCCtrlComm if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } _, err := o.SystemReboot() var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT_ACK), strRet, mqtt.AtMostOnce, ToAll) } } func (o *LcDevice) HandleTpOnvifSynctime(m mqtt.Message) { var obj protocol.Pack_IPCCtrlComm if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } err := o.SetSystemDateAndTime() var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME_ACK), strRet, mqtt.AtMostOnce, ToAll) } } func (o *LcDevice) Update(addr string) { if addr != "" { o.chanXAddr <- addr } } func (o *LcDevice) init(xaddr string) error { //获取服务能力,存入endpoint err := o.GetCapabilities(xaddr) if err != nil { return err } //获取profile,存入videoToken err = o.GetProfiles() if err != nil { return err } if o.EventActuator != nil { o.EventActuator.Cancel() if addr, ok := o.endpoints["events"]; ok { o.EventActuator = NewEventActuator(o) go o.EventActuator.EventHandle(addr) } } return nil } func (o *LcDevice) Handle() { defer func() { recover() go o.Handle() }() timer := time.NewTicker(1 * time.Minute) NeedRetry := true //初始化地址 o.XAddr = "http://" + o.IP + ":80/onvif/device_service" if err := o.init(o.XAddr); err == nil { NeedRetry = false } for { select { case <-o.ctx.Done(): o.CleanupProcess() if o.EventActuator != nil { o.EventActuator.Cancel() } return case <-timer.C: //每隔1分钟执行一次 if NeedRetry { if err := o.init(o.XAddr); err == nil { NeedRetry = false } } o.State.CheckOnline(o) case addr := <-o.chanXAddr: if addr != o.XAddr { o.XAddr = addr if err := o.init(addr); err == nil { NeedRetry = false } } default: if o.EventActuator == nil && len(o.onvifDev.Event) > 0 { if addr, ok := o.endpoints["events"]; ok { o.EventActuator = NewEventActuator(o) go o.EventActuator.EventHandle(addr) } } //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } } } else { time.Sleep(50 * time.Millisecond) } } } } func FileExist(path string) bool { _, err := os.Lstat(path) return !os.IsNotExist(err) }