srsmgr.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package main
  2. import (
  3. "io/ioutil"
  4. "net/http"
  5. "os"
  6. "path/filepath"
  7. "runtime/debug"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/labstack/echo/v4/middleware"
  13. "github.com/go-resty/resty/v2"
  14. "github.com/labstack/echo/v4"
  15. "github.com/sirupsen/logrus"
  16. )
  17. var _onceSrsMgr sync.Once
  18. var _singleSrsMgr *SrsMgr
  19. func GetSrsMgr() *SrsMgr {
  20. _onceSrsMgr.Do(func() {
  21. _singleSrsMgr = &SrsMgr{
  22. echo: echo.New(),
  23. }
  24. })
  25. return _singleSrsMgr
  26. }
  27. type SrsMgr struct {
  28. echo *echo.Echo
  29. }
  30. type Devices struct {
  31. DeviceID string `json:"device_id"`
  32. DeviceStatus string `json:"device_status"`
  33. InviteStatus string `json:"invite_status"`
  34. InviteTime int64 `json:"invite_time"`
  35. }
  36. type Session struct {
  37. ID string `json:"id"`
  38. DeviceSum int `json:"device_sumnum"`
  39. Devices []Devices `json:"devices"`
  40. }
  41. type Sessions struct {
  42. Sessions []Session `json:"sessions"`
  43. }
  44. type SipQuerySession struct {
  45. Code int `json:"code"`
  46. Data Sessions `json:"data"`
  47. }
  48. type Channel struct {
  49. ID string `json:"id"`
  50. IP string `json:"ip"`
  51. RtmpPort int `json:"rtmp_port"`
  52. App string `json:"app"`
  53. Stream string `json:"stream"`
  54. RtmpUrl string `json:"rtmp_url"`
  55. Ssrc int `json:"ssrc"`
  56. RtpPort int `json:"rtp_port"`
  57. PortMode string `json:"port_mode"`
  58. RtpPeerPort int `json:"rtp_peer_port"`
  59. RtpPeerIP string `json:"rtp_peer_ip"`
  60. RecvTime uint64 `json:"recv_time"`
  61. RecvTimeStr string `json:"recv_time_str"`
  62. }
  63. type Channels struct {
  64. Channels []Channel `json:"channels"`
  65. }
  66. type QueryChannel struct {
  67. Code int `json:"code"`
  68. Data Channels `json:"data"`
  69. }
  70. type CallbackAPIObject struct {
  71. Action string `json:"action"`
  72. Stream string `json:"stream"`
  73. File string `json:"file"`
  74. }
  75. // SipQuerySession 查询SIP会话
  76. func (o *SrsMgr) SipQuerySession(client *resty.Client) *SipQuerySession {
  77. resp, err := client.R().Get(mainConfig.GB28181API + "?action=sip_query_session")
  78. if err != nil {
  79. logrus.Errorf("查询SIP会话失败:%s", err.Error())
  80. return nil
  81. }
  82. if resp.StatusCode() != http.StatusOK {
  83. logrus.Errorf("查询SIP会话失败,http状态码:%s", resp.Status())
  84. return nil
  85. }
  86. var sqs SipQuerySession
  87. if err := json.Unmarshal(resp.Body(), &sqs); err != nil {
  88. logrus.Errorf("查询SIP会话,解析响应数据失败:%s", err.Error())
  89. return nil
  90. }
  91. return &sqs
  92. }
  93. // QueryChannel 查询通道
  94. func (o *SrsMgr) QueryChannel(client *resty.Client) *QueryChannel {
  95. resp, err := client.R().Get(mainConfig.GB28181API + "?action=query_channel")
  96. if err != nil {
  97. logrus.Errorf("查询通道失败:%s", err.Error())
  98. return nil
  99. }
  100. if resp.StatusCode() != http.StatusOK {
  101. logrus.Errorf("查询通道失败,http状态码:%s", resp.Status())
  102. return nil
  103. }
  104. var sc QueryChannel
  105. if err := json.Unmarshal(resp.Body(), &sc); err != nil {
  106. logrus.Errorf("查询通道,解析响应数据失败:%s", err.Error())
  107. return nil
  108. }
  109. return &sc
  110. }
  111. func (o *SrsMgr) Handler(args ...interface{}) interface{} {
  112. defer func() {
  113. if err := recover(); err != nil {
  114. gopool.Add(o.Handler, args)
  115. logrus.Errorf("GB28181Mgr.Handler发生异常:%s", string(debug.Stack()))
  116. }
  117. }()
  118. client := resty.New() //REST Client
  119. client.SetTimeout(6 * time.Second)
  120. for {
  121. time.Sleep(10 * time.Second)
  122. sqs := o.SipQuerySession(client)
  123. if sqs == nil {
  124. continue
  125. }
  126. sc := o.QueryChannel(client)
  127. if sc == nil {
  128. continue
  129. }
  130. mapChannels := make(map[string]bool)
  131. for _, vc := range sc.Data.Channels {
  132. mapChannels[vc.ID] = true
  133. }
  134. for _, v := range sqs.Data.Sessions {
  135. for _, v1 := range v.Devices {
  136. cid := v.ID + "@" + v1.DeviceID
  137. if _, ok := mapChannels[cid]; !ok { //有sip会话,但未创建多媒体通道的,则需要创建多媒体通道
  138. queryPara := "?action=create_channel&id=" + cid + "&app=live&stream=[stream]&port_mode=fixed"
  139. if _, err := client.R().Get(mainConfig.GB28181API + queryPara); err != nil {
  140. logrus.Errorf("多媒体通道创建失败,id=%s,失败原因:%s", cid, err.Error())
  141. } else {
  142. logrus.Infof("多媒体通道创建成功:id=%s", cid)
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }
  149. // StartWeb 提供api服务
  150. func (o *SrsMgr) StartWeb(args ...interface{}) interface{} {
  151. defer func() {
  152. if err := recover(); err != nil {
  153. gopool.Add(o.StartWeb, args)
  154. logrus.Errorf("SrsMgr.StartWeb发生异常:%v", err)
  155. logrus.Errorf("SrsMgr.StartWeb发生异常,堆栈信息:%s", string(debug.Stack()))
  156. }
  157. }()
  158. o.echo.Debug = true
  159. o.echo.HideBanner = true
  160. o.echo.Use(middleware.Recover())
  161. o.echo.Use(middleware.CORS())
  162. o.echo.Any("/api/*", o.HookHandler) //删除过期的视频调用,谁调用?
  163. o.echo.HTTPErrorHandler = o.HTTPErrorHandler
  164. return o.echo.Start("0.0.0.0:8198")
  165. }
  166. func (o *SrsMgr) HTTPErrorHandler(err error, c echo.Context) {
  167. _ = c.JSON(http.StatusInternalServerError, nil)
  168. }
  169. type H map[string]interface{}
  170. func (o *SrsMgr) HookHandler(c echo.Context) error {
  171. if c.Request().Body != nil {
  172. buf, err := ioutil.ReadAll(c.Request().Body)
  173. var obj CallbackAPIObject
  174. err = json.Unmarshal(buf, &obj)
  175. if err != nil {
  176. logrus.Warnf("SrsMgr.HookHandler解析%s失败:%s", string(buf), err.Error())
  177. } else {
  178. if obj.Action == "on_dvr" {
  179. files, _ := WalkDir(mainConfig.SRSLIVEDIR+string(os.PathSeparator)+obj.Stream, ".mp4")
  180. n := len(files)
  181. if n > mainConfig.SRSLIVEMP4 {
  182. sort.Strings(files)
  183. rmNum := n - mainConfig.SRSLIVEMP4
  184. for i, v := range files {
  185. if i < rmNum {
  186. if err = os.Remove(v); err == nil {
  187. logrus.Warnf("SrsMgr.HookHandler删除视频文件%s成功", v)
  188. } else {
  189. logrus.Warnf("SrsMgr.HookHandler删除视频文件%s失败:%s", v, err.Error())
  190. }
  191. } else {
  192. break
  193. }
  194. }
  195. }
  196. }
  197. }
  198. }
  199. return c.JSON(http.StatusOK, H{"code": 0})
  200. }
  201. func WalkDir(dirPth, suffix string) (files []string, err error) {
  202. files = make([]string, 0, 30)
  203. suffix = strings.ToUpper(suffix)
  204. err = filepath.Walk(dirPth, //遍历
  205. func(filename string, fi os.FileInfo, err error) error {
  206. if err != nil {
  207. return err
  208. }
  209. if fi.IsDir() {
  210. return nil
  211. }
  212. if strings.HasSuffix(strings.ToUpper(fi.Name()), suffix) {
  213. files = append(files, filename)
  214. }
  215. return nil
  216. })
  217. return files, err
  218. }