ipc_media.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package main
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "net/http"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/go-resty/resty/v2"
  11. "github.com/sirupsen/logrus"
  12. "lc/common/mqtt"
  13. "lc/common/onvif/profiles/media"
  14. "lc/common/onvif/soap"
  15. "lc/common/protocol"
  16. "lc/common/util"
  17. )
  18. var (
  19. APPLIVE = "live"
  20. UploadSnapshot = "/camera/v1/snapshot"
  21. UploadFlv = "/camera/v1/flv"
  22. UploadPresetSnapshot = "/camera/v1/preset/picture"
  23. )
  24. func (o *LcDevice) GetProfiles() error {
  25. XAddr, ok := o.endpoints["media"]
  26. if !ok {
  27. return errors.New("不支持media")
  28. }
  29. m := media.NewMedia(o.client, XAddr)
  30. reply, err := m.GetProfiles(&media.GetProfiles{})
  31. if err != nil {
  32. if serr, ok := err.(*soap.SOAPFault); ok {
  33. logrus.Error(serr)
  34. }
  35. return err
  36. }
  37. for _, v := range reply.Profiles {
  38. vt := VideoToken{
  39. Token: string(v.Token),
  40. Name: string(v.Name),
  41. VideoEncoding: string(v.VideoEncoderConfiguration.Encoding),
  42. VideoResolution: Resolution{
  43. Width: v.VideoEncoderConfiguration.Resolution.Width,
  44. Height: v.VideoEncoderConfiguration.Resolution.Height,
  45. },
  46. VideoFrameRateLimit: v.VideoEncoderConfiguration.RateControl.FrameRateLimit,
  47. VideoBitrateLimit: v.VideoEncoderConfiguration.RateControl.BitrateLimit,
  48. }
  49. o.tokens[vt.Token] = &vt
  50. }
  51. return nil
  52. }
  53. func (o *LcDevice) GetAllStreamUri() error {
  54. XAddr, ok := o.endpoints["media"]
  55. if !ok {
  56. return errors.New("不支持media")
  57. }
  58. m := media.NewMedia(o.client, XAddr)
  59. for k, v := range o.tokens {
  60. for retry := 3; retry > 0; retry-- {
  61. reply, err := m.GetStreamUri(&media.GetStreamUri{ProfileToken: media.ReferenceToken(k)})
  62. if err != nil {
  63. //soap错误,则退出
  64. if serr, ok := err.(*soap.SOAPFault); ok {
  65. logrus.Error(serr)
  66. break
  67. }
  68. time.Sleep(3 * time.Second)
  69. } else {
  70. v.VideoRtspUrl = strings.ReplaceAll(string(reply.MediaUri.Uri), "rtsp://", "rtsp://"+o.onvifDev.User+":"+o.onvifDev.Password+"@")
  71. break
  72. }
  73. }
  74. }
  75. return nil
  76. }
  77. func (o *LcDevice) GetAllSnapshotUri() error {
  78. XAddr, ok := o.endpoints["media"]
  79. if !ok {
  80. return errors.New("不支持media")
  81. }
  82. m := media.NewMedia(o.client, XAddr)
  83. for k, v := range o.tokens {
  84. for retry := 3; retry > 0; retry-- {
  85. reply, err := m.GetSnapshotUri(&media.GetSnapshotUri{ProfileToken: media.ReferenceToken(k)})
  86. if err != nil {
  87. //soap错误,则退出
  88. if serr, ok := err.(*soap.SOAPFault); ok {
  89. logrus.Error(serr)
  90. break
  91. }
  92. //其他错误停3秒重试
  93. time.Sleep(5 * time.Second)
  94. } else {
  95. v.SnapshotUrl = string(reply.MediaUri.Uri)
  96. break
  97. }
  98. }
  99. }
  100. return nil
  101. }
  102. // SnapshotFromRtsp 通过RTSP截取一张图片
  103. func (o *LcDevice) SnapshotFromRtsp(token string) (string, error) {
  104. RtspUrl := o.GetRtspUrl(token)
  105. if RtspUrl == "" {
  106. logrus.Errorf("未获取到设备[%s]的RtspUrl,无法截图.", o.onvifDev.Code)
  107. return "", errors.New("找不到RtspUrl")
  108. }
  109. wdir, _ := os.Getwd()
  110. proc := &os.ProcAttr{
  111. Dir: wdir,
  112. Env: os.Environ(),
  113. Files: []*os.File{
  114. os.Stdin,
  115. os.Stdout,
  116. os.Stderr,
  117. },
  118. }
  119. fileName := util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".jpg"
  120. args := []string{onvifDevConfig.Ffmpeg, "-i", RtspUrl, "-t", "0.001", "-y", "-f", "mjpeg", "-r", "1", fileName}
  121. process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc)
  122. if err != nil {
  123. logrus.Errorf("启动ffmpeg失败:%s", err.Error())
  124. return "", err
  125. } else {
  126. go func() {
  127. time.Sleep(6 * time.Second)
  128. if process != nil {
  129. process.Signal(os.Kill)
  130. }
  131. }()
  132. _, err = process.Wait()
  133. process = nil
  134. }
  135. //判断是否已经抓图成功
  136. if FileExist(fileName) {
  137. return fileName, nil
  138. }
  139. err = errors.New("snapshotFromRtsp:抓图失败")
  140. return "", err
  141. }
  142. func (o *LcDevice) GetRtspUrl(token string) string {
  143. rtspurl := ""
  144. if v, ok := o.tokens[token]; !ok {
  145. max := int32(0)
  146. for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl
  147. r := v.VideoResolution.Height * v.VideoResolution.Width
  148. if r > max {
  149. max = r
  150. rtspurl = v.VideoRtspUrl
  151. }
  152. }
  153. } else {
  154. rtspurl = v.VideoRtspUrl
  155. }
  156. return rtspurl
  157. }
  158. func (o *LcDevice) GetSnapshotUrl(token string) string {
  159. snapshoturl := ""
  160. max := int32(0)
  161. if v, ok := o.tokens[token]; !ok {
  162. for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl
  163. r := v.VideoResolution.Height * v.VideoResolution.Width
  164. if r > max {
  165. max = r
  166. snapshoturl = v.SnapshotUrl
  167. }
  168. }
  169. } else {
  170. snapshoturl = v.SnapshotUrl
  171. }
  172. return snapshoturl
  173. }
  174. func (o *LcDevice) GetProfileToken(token string) string {
  175. max := int32(0)
  176. if _, ok := o.tokens[token]; !ok {
  177. for _, v := range o.tokens { //查找分辨率最高的SnapshotUrl
  178. r := v.VideoResolution.Height * v.VideoResolution.Width
  179. if r > max {
  180. max = r
  181. token = v.Token
  182. }
  183. }
  184. }
  185. return token
  186. }
  187. func (o *LcDevice) Snapshot(token string) {
  188. snapshoturl := o.GetSnapshotUrl(token)
  189. if snapshoturl == "" {
  190. logrus.Debugf("未获取到设备[%s]的SnapshotUrl,无法截图.", o.onvifDev.Code)
  191. return
  192. }
  193. var fileName string
  194. client := resty.New()
  195. client.SetTimeout(6 * time.Second)
  196. client.SetBasicAuth(o.onvifDev.User, o.onvifDev.Password)
  197. resp, err := client.R().Get(snapshoturl)
  198. if err != nil {
  199. logrus.Errorf("抓图失败:%s", err.Error())
  200. fileName, err = o.SnapshotFromRtsp(token)
  201. } else {
  202. if resp.StatusCode() != http.StatusOK {
  203. logrus.Errorf("抓图失败,http返回错误:%s", resp.Status())
  204. fileName, err = o.SnapshotFromRtsp(token)
  205. } else if len(resp.Body()) == 0 {
  206. logrus.Error("抓图失败,内容为空")
  207. fileName, err = o.SnapshotFromRtsp(token)
  208. } else {
  209. fileName = util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".jpg"
  210. if err = ioutil.WriteFile(fileName, resp.Body(), os.ModePerm); err != nil {
  211. logrus.Errorf("存储图片失败:%s", err.Error())
  212. return
  213. }
  214. }
  215. }
  216. if fileName != "" && err == nil {
  217. resp, err := client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadSnapshot)
  218. if err != nil {
  219. logrus.Errorf("上传文件失败:%s", err.Error())
  220. } else {
  221. if resp.StatusCode() != http.StatusOK {
  222. logrus.Errorf("上传文件失败,http返回错误:%s", resp.Status())
  223. }
  224. }
  225. os.Remove(fileName)
  226. }
  227. }
  228. func (o *LcDevice) Snapshot2(file string) error {
  229. snapshoturl := o.GetSnapshotUrl("")
  230. if snapshoturl == "" {
  231. return errors.New("找不到链接,截图错误")
  232. }
  233. var fileName string
  234. client := resty.New()
  235. client.SetTimeout(6 * time.Second)
  236. client.SetBasicAuth(o.onvifDev.User, o.onvifDev.Password)
  237. resp, err := client.R().Get(snapshoturl)
  238. if err != nil {
  239. return err
  240. }
  241. if resp.StatusCode() != http.StatusOK {
  242. return errors.New("调用截图接口发生错误")
  243. }
  244. fileName = util.GetPath(4) + file
  245. err = ioutil.WriteFile(fileName, resp.Body(), os.ModePerm)
  246. if err != nil {
  247. return err
  248. }
  249. resp, err = client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadPresetSnapshot)
  250. os.Remove(fileName)
  251. if err != nil {
  252. return err
  253. }
  254. if resp.StatusCode() != http.StatusOK {
  255. return errors.New("截图上传发生错误")
  256. }
  257. return nil
  258. }
  259. func (o *LcDevice) CreateProcess(rtspurl string) {
  260. wdir, _ := os.Getwd()
  261. proc := &os.ProcAttr{
  262. Dir: wdir,
  263. Env: os.Environ(),
  264. Files: []*os.File{
  265. os.Stdin,
  266. os.Stdout,
  267. os.Stderr,
  268. },
  269. }
  270. args := []string{onvifDevConfig.Ffmpeg, "-fflags", "nobuffer", "-i", rtspurl, "-c", "copy", "-f", "flv", o.onvifDev.RtmpServer + "/" + APPLIVE + "/" + o.onvifDev.Code}
  271. process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc)
  272. if err != nil {
  273. logrus.Errorf("启动ffmpeg失败:%s", err.Error())
  274. return
  275. }
  276. o.ffmpeg = process
  277. go o.WatchFfmpeg()
  278. logrus.Infof("启动ffmpeg成功:pid=%d", o.ffmpeg.Pid)
  279. }
  280. func (o *LcDevice) CleanupProcess() {
  281. if o.ffmpeg != nil {
  282. logrus.Infof("ffmpeg进程销毁:pid=%d", o.ffmpeg.Pid)
  283. if err := o.ffmpeg.Kill(); err != nil {
  284. logrus.Infof("ffmpeg进程%dKill失败:%s", o.ffmpeg.Pid, err.Error())
  285. }
  286. }
  287. }
  288. func (o *LcDevice) WatchFfmpeg() {
  289. status := make(chan *os.ProcessState)
  290. died := make(chan error)
  291. go func() {
  292. state, err := o.ffmpeg.Wait()
  293. if err != nil {
  294. died <- err
  295. return
  296. }
  297. status <- state
  298. }()
  299. select {
  300. case s := <-status:
  301. logrus.Infof("ffmpeg已退出:%s", s.String())
  302. case err := <-died:
  303. logrus.Infof("ffmpeg已退出:%s", err.Error())
  304. }
  305. o.ffmpeg = nil
  306. }
  307. func (o *LcDevice) RecordToFLV(token string, second int) {
  308. //最长录制60秒视频
  309. if second > 60 {
  310. second = 60
  311. }
  312. RtspUrl := o.GetRtspUrl(token)
  313. if RtspUrl == "" {
  314. logrus.Debugf("未获取到设备[%s]的RtspUrl,无法录制视频.", o.onvifDev.Code)
  315. return
  316. }
  317. wdir, _ := os.Getwd()
  318. proc := &os.ProcAttr{
  319. Dir: wdir,
  320. Env: os.Environ(),
  321. Files: []*os.File{
  322. os.Stdin,
  323. os.Stdout,
  324. os.Stderr,
  325. },
  326. }
  327. fileName := util.GetPath(4) + o.onvifDev.Code + "_" + strconv.FormatInt(util.MlNow().Unix(), 10) + ".flv"
  328. args := []string{onvifDevConfig.Ffmpeg, "-i", RtspUrl, "-c", "copy", "-t", strconv.Itoa(second), fileName}
  329. process, err := os.StartProcess(onvifDevConfig.Ffmpeg, args, proc)
  330. if err != nil {
  331. logrus.Errorf("启动ffmpeg失败:%s", err.Error())
  332. return
  333. } else if process != nil {
  334. go func() {
  335. time.Sleep(time.Duration(second+5) * time.Second)
  336. if process != nil {
  337. process.Signal(os.Kill)
  338. }
  339. }()
  340. _, err = process.Wait()
  341. //判断是否已经抓图成功
  342. if fileName != "" && err == nil {
  343. if FileExist(fileName) {
  344. client := resty.New()
  345. client.SetTimeout(6 * time.Second)
  346. resp, err := client.R().SetFile("file", fileName).Post(o.onvifDev.WebServer + UploadFlv)
  347. if err != nil {
  348. logrus.Errorf("上传文件失败:%s", err.Error())
  349. } else {
  350. if resp.StatusCode() != http.StatusOK {
  351. logrus.Errorf("上传文件失败,http返回错误:%s", resp.Status())
  352. }
  353. }
  354. os.Remove(fileName)
  355. }
  356. }
  357. }
  358. }
  359. // HandleTpOnvifSnapshot 截图
  360. func (o *LcDevice) HandleTpOnvifSnapshot(m mqtt.Message) {
  361. var obj protocol.Pack_MediaCommonInfo
  362. if err := obj.DeCode(m.PayloadString()); err != nil {
  363. return
  364. }
  365. if o.onvifDev.Code != obj.Id {
  366. return
  367. }
  368. var ret protocol.Pack_IPCCommonACK
  369. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil {
  370. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT_ACK), strRet, mqtt.AtMostOnce, ToAll)
  371. }
  372. go o.Snapshot(obj.Data.ProfileToken)
  373. }
  374. // HandleTpOnvifVideo 拉流推流,停止拉流推流
  375. func (o *LcDevice) HandleTpOnvifVideo(m mqtt.Message) {
  376. var obj protocol.Pack_MediaCommonInfo
  377. if err := obj.DeCode(m.PayloadString()); err != nil {
  378. return
  379. }
  380. if o.onvifDev.Code != obj.Id {
  381. return
  382. }
  383. var ret protocol.Pack_IPCCommonACK
  384. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil {
  385. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO_ACK), strRet, mqtt.AtMostOnce, ToAll)
  386. }
  387. //如果已开启GB28181,则不用启动ffmpeg推流
  388. if o.onvifDev.Gb28181 {
  389. logrus.Debugf("设备[%s]已开启GB28181视频服务,无需ffmpeg推流", o.onvifDev.Code)
  390. return
  391. }
  392. if obj.Data.Flag == 1 {
  393. o.observer++
  394. logrus.Debugf("执行推流一次,当前有%d次引用", o.observer)
  395. if o.ffmpeg != nil {
  396. return
  397. }
  398. if rtspurl := o.GetRtspUrl(obj.Data.ProfileToken); rtspurl == "" {
  399. logrus.Errorf("请求设备[%s]的视频流错误,找不到拉流地址.", o.onvifDev.Code)
  400. return
  401. } else {
  402. o.CreateProcess(rtspurl)
  403. }
  404. } else if obj.Data.Flag == 2 { //停止推流
  405. if o.observer > 0 {
  406. o.observer--
  407. logrus.Debugf("停止推流一次,余下%d次引用", o.observer)
  408. if o.observer == 0 {
  409. o.CleanupProcess()
  410. }
  411. }
  412. }
  413. }
  414. func (o *LcDevice) HandleTpOnvifRecord(m mqtt.Message) {
  415. var obj protocol.Pack_MediaCommonInfo
  416. if err := obj.DeCode(m.PayloadString()); err != nil {
  417. return
  418. }
  419. if o.onvifDev.Code != obj.Id {
  420. return
  421. }
  422. var ret protocol.Pack_IPCCommonACK
  423. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, nil); err == nil {
  424. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD_ACK), strRet, mqtt.AtMostOnce, ToAll)
  425. }
  426. go o.RecordToFLV("", 30)
  427. }