package main import ( "errors" "io/ioutil" "net/http" "os" "strconv" "strings" "time" "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/onvif/profiles/media" "lc/common/onvif/soap" "lc/common/protocol" "lc/common/util" ) var ( APPLIVE = "live" UploadSnapshot = "/camera/v1/snapshot" UploadFlv = "/camera/v1/flv" UploadPresetSnapshot = "/camera/v1/preset/picture" ) func (o *LcDevice) GetProfiles() error { XAddr, ok := o.endpoints["media"] if !ok { return errors.New("不支持media") } m := media.NewMedia(o.client, XAddr) reply, err := m.GetProfiles(&media.GetProfiles{}) if err != nil { if serr, ok := err.(*soap.SOAPFault); ok { logrus.Error(serr) } return err } for _, v := range reply.Profiles { vt := VideoToken{ Token: string(v.Token), Name: string(v.Name), VideoEncoding: string(v.VideoEncoderConfiguration.Encoding), VideoResolution: Resolution{ Width: v.VideoEncoderConfiguration.Resolution.Width, Height: v.VideoEncoderConfiguration.Resolution.Height, }, VideoFrameRateLimit: v.VideoEncoderConfiguration.RateControl.FrameRateLimit, VideoBitrateLimit: v.VideoEncoderConfiguration.RateControl.BitrateLimit, } o.tokens[vt.Token] = &vt } return nil } func (o *LcDevice) GetAllStreamUri() error { XAddr, ok := o.endpoints["media"] if !ok { return errors.New("不支持media") } m := media.NewMedia(o.client, XAddr) for k, v := range o.tokens { for retry := 3; retry > 0; retry-- { reply, err := m.GetStreamUri(&media.GetStreamUri{ProfileToken: media.ReferenceToken(k)}) if err != nil { //soap错误,则退出 if serr, ok := err.(*soap.SOAPFault); ok { logrus.Error(serr) break } time.Sleep(3 * time.Second) } else { v.VideoRtspUrl = strings.ReplaceAll(string(reply.MediaUri.Uri), "rtsp://", "rtsp://"+o.onvifDev.User+":"+o.onvifDev.Password+"@") break } } } return nil } func (o *LcDevice) GetAllSnapshotUri() error { XAddr, ok := o.endpoints["media"] if !ok { return errors.New("不支持media") } m := media.NewMedia(o.client, XAddr) for k, v := range o.tokens { for retry := 3; retry > 0; retry-- { reply, err := m.GetSnapshotUri(&media.GetSnapshotUri{ProfileToken: media.ReferenceToken(k)}) if err != nil { //soap错误,则退出 if serr, ok := err.(*soap.SOAPFault); ok { logrus.Error(serr) break } //其他错误停3秒重试 time.Sleep(5 * time.Second) } else { v.SnapshotUrl = string(reply.MediaUri.Uri) break } } } return nil } // SnapshotFromRtsp 通过RTSP截取一张图片 func (o *LcDevice) SnapshotFromRtsp(token string) (string, error) { RtspUrl := o.GetRtspUrl(token) if RtspUrl == "" { logrus.Errorf("未获取到设备[%s]的RtspUrl,无法截图.", o.onvifDev.Code) return "", errors.New("找不到RtspUrl") } wdir, _ := os.Getwd() proc := &os.ProcAttr{ Dir: wdir, Env: os.Environ(), Files: []*os.File{ os.Stdin, os.Stdout, os.Stderr, }, } fileName := util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".jpg" args := []string{onvifDevConfig.Ffmpeg, "-i", RtspUrl, "-t", "0.001", "-y", "-f", "mjpeg", "-r", "1", fileName} process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc) if err != nil { logrus.Errorf("启动ffmpeg失败:%s", err.Error()) return "", err } else { go func() { time.Sleep(6 * time.Second) if process != nil { process.Signal(os.Kill) } }() _, err = process.Wait() process = nil } //判断是否已经抓图成功 if FileExist(fileName) { return fileName, nil } err = errors.New("snapshotFromRtsp:抓图失败") return "", err } func (o *LcDevice) GetRtspUrl(token string) string { rtspurl := "" if v, ok := o.tokens[token]; !ok { max := int32(0) for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl r := v.VideoResolution.Height * v.VideoResolution.Width if r > max { max = r rtspurl = v.VideoRtspUrl } } } else { rtspurl = v.VideoRtspUrl } return rtspurl } func (o *LcDevice) GetSnapshotUrl(token string) string { snapshoturl := "" max := int32(0) if v, ok := o.tokens[token]; !ok { for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl r := v.VideoResolution.Height * v.VideoResolution.Width if r > max { max = r snapshoturl = v.SnapshotUrl } } } else { snapshoturl = v.SnapshotUrl } return snapshoturl } func (o *LcDevice) GetProfileToken(token string) string { max := int32(0) if _, ok := o.tokens[token]; !ok { for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl r := v.VideoResolution.Height * v.VideoResolution.Width if r > max { max = r token = v.Token } } } return token } func (o *LcDevice) Snapshot(token string) { snapshoturl := o.GetSnapshotUrl(token) if snapshoturl == "" { logrus.Debugf("未获取到设备[%s]的SnapshotUrl,无法截图.", o.onvifDev.Code) return } var fileName string client := resty.New() client.SetTimeout(6 * time.Second) client.SetBasicAuth(o.onvifDev.User, o.onvifDev.Password) resp, err := client.R().Get(snapshoturl) if err != nil { logrus.Errorf("抓图失败:%s", err.Error()) fileName, err = o.SnapshotFromRtsp(token) } else { if resp.StatusCode() != http.StatusOK { logrus.Errorf("抓图失败,http返回错误:%s", resp.Status()) fileName, err = o.SnapshotFromRtsp(token) } else if len(resp.Body()) == 0 { logrus.Error("抓图失败,内容为空") fileName, err = o.SnapshotFromRtsp(token) } else { fileName = util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".jpg" if err = ioutil.WriteFile(fileName, resp.Body(), os.ModePerm); err != nil { logrus.Errorf("存储图片失败:%s", err.Error()) return } } } if fileName != "" && err == nil { resp, err := client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadSnapshot) if err != nil { logrus.Errorf("上传文件失败:%s", err.Error()) } else { if resp.StatusCode() != http.StatusOK { logrus.Errorf("上传文件失败,http返回错误:%s", resp.Status()) } } os.Remove(fileName) } } func (o *LcDevice) Snapshot2(file string) error { snapshoturl := o.GetSnapshotUrl("") if snapshoturl == "" { return errors.New("找不到链接,截图错误") } var fileName string client := resty.New() client.SetTimeout(6 * time.Second) client.SetBasicAuth(o.onvifDev.User, o.onvifDev.Password) resp, err := client.R().Get(snapshoturl) if err != nil { return err } if resp.StatusCode() != http.StatusOK { return errors.New("调用截图接口发生错误") } fileName = util.GetPath(4) + file err = ioutil.WriteFile(fileName, resp.Body(), os.ModePerm) if err != nil { return err } resp, err = client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadPresetSnapshot) os.Remove(fileName) if err != nil { return err } if resp.StatusCode() != http.StatusOK { return errors.New("截图上传发生错误") } return nil } func (o *LcDevice) CreateProcess(rtspurl string) { wdir, _ := os.Getwd() proc := &os.ProcAttr{ Dir: wdir, Env: os.Environ(), Files: []*os.File{ os.Stdin, os.Stdout, os.Stderr, }, } args := []string{onvifDevConfig.Ffmpeg, "-fflags", "nobuffer", "-i", rtspurl, "-c", "copy", "-f", "flv", o.onvifDev.RtmpServer + "/" + APPLIVE + "/" + o.onvifDev.Code} process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc) if err != nil { logrus.Errorf("启动ffmpeg失败:%s", err.Error()) return } o.ffmpeg = process go o.WatchFfmpeg() logrus.Infof("启动ffmpeg成功:pid=%d", o.ffmpeg.Pid) } func (o *LcDevice) CleanupProcess() { if o.ffmpeg != nil { logrus.Infof("ffmpeg进程销毁:pid=%d", o.ffmpeg.Pid) if err := o.ffmpeg.Kill(); err != nil { logrus.Infof("ffmpeg进程%dKill失败:%s", o.ffmpeg.Pid, err.Error()) } } } func (o *LcDevice) WatchFfmpeg() { status := make(chan *os.ProcessState) died := make(chan error) go func() { state, err := o.ffmpeg.Wait() if err != nil { died <- err return } status <- state }() select { case s := <-status: logrus.Infof("ffmpeg已退出:%s", s.String()) case err := <-died: logrus.Infof("ffmpeg已退出:%s", err.Error()) } o.ffmpeg = nil } func (o *LcDevice) RecordToFLV(token string, second int) { //最长录制60秒视频 if second > 60 { second = 60 } RtspUrl := o.GetRtspUrl(token) if RtspUrl == "" { logrus.Debugf("未获取到设备[%s]的RtspUrl,无法录制视频.", o.onvifDev.Code) return } wdir, _ := os.Getwd() proc := &os.ProcAttr{ Dir: wdir, Env: os.Environ(), Files: []*os.File{ os.Stdin, os.Stdout, os.Stderr, }, } fileName := util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".flv" args := []string{onvifDevConfig.Ffmpeg, "-i", RtspUrl, "-c", "copy", "-t", strconv.Itoa(second), fileName} process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc) if err != nil { logrus.Errorf("启动ffmpeg失败:%s", err.Error()) return } else if process != nil { go func() { time.Sleep(time.Duration(second+5) * time.Second) if process != nil { process.Signal(os.Kill) } }() _, err = process.Wait() //判断是否已经抓图成功 if fileName != "" && err == nil { if FileExist(fileName) { client := resty.New() client.SetTimeout(6 * time.Second) resp, err := client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadFlv) if err != nil { logrus.Errorf("上传文件失败:%s", err.Error()) } else { if resp.StatusCode() != http.StatusOK { logrus.Errorf("上传文件失败,http返回错误:%s", resp.Status()) } } os.Remove(fileName) } } } } // HandleTpOnvifSnapshot 截图 func (o *LcDevice) HandleTpOnvifSnapshot(m mqtt.Message) { var obj protocol.Pack_MediaCommonInfo if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT_ACK), strRet, mqtt.AtMostOnce, ToAll) } go o.Snapshot(obj.Data.ProfileToken) } // HandleTpOnvifVideo 拉流推流,停止拉流推流 func (o *LcDevice) HandleTpOnvifVideo(m mqtt.Message) { var obj protocol.Pack_MediaCommonInfo if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO_ACK), strRet, mqtt.AtMostOnce, ToAll) } //如果已开启GB28181,则不用启动ffmpeg推流 if o.onvifDev.Gb28181 { logrus.Debugf("设备[%s]已开启GB28181视频服务,无需ffmpeg推流", o.onvifDev.Code) return } if obj.Data.Flag == 1 { o.observer++ logrus.Debugf("执行推流一次,当前有%d次引用", o.observer) if o.ffmpeg != nil { return } if rtspurl := o.GetRtspUrl(obj.Data.ProfileToken); rtspurl == "" { logrus.Errorf("请求设备[%s]的视频流错误,找不到拉流地址.", o.onvifDev.Code) return } else { o.CreateProcess(rtspurl) } } else if obj.Data.Flag == 2 { //停止推流 if o.observer > 0 { o.observer-- logrus.Debugf("停止推流一次,余下%d次引用", o.observer) if o.observer == 0 { o.CleanupProcess() } } } } func (o *LcDevice) HandleTpOnvifRecord(m mqtt.Message) { var obj protocol.Pack_MediaCommonInfo if err := obj.DeCode(m.PayloadString()); err != nil { return } if o.onvifDev.Code != obj.Id { return } var ret protocol.Pack_IPCCommonACK if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil { GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD_ACK), strRet, mqtt.AtMostOnce, ToAll) } go o.RecordToFLV("", 30) }