| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package main
- import (
- "io/ioutil"
- "net/http"
- "os"
- "path/filepath"
- "runtime/debug"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/labstack/echo/v4/middleware"
- "github.com/go-resty/resty/v2"
- "github.com/labstack/echo/v4"
- "github.com/sirupsen/logrus"
- )
- var _onceSrsMgr sync.Once
- var _singleSrsMgr *SrsMgr
- func GetSrsMgr() *SrsMgr {
- _onceSrsMgr.Do(func() {
- _singleSrsMgr = &SrsMgr{
- echo: echo.New(),
- }
- })
- return _singleSrsMgr
- }
- type SrsMgr struct {
- echo *echo.Echo
- }
- type Devices struct {
- DeviceID string `json:"device_id"`
- DeviceStatus string `json:"device_status"`
- InviteStatus string `json:"invite_status"`
- InviteTime int64 `json:"invite_time"`
- }
- type Session struct {
- ID string `json:"id"`
- DeviceSum int `json:"device_sumnum"`
- Devices []Devices `json:"devices"`
- }
- type Sessions struct {
- Sessions []Session `json:"sessions"`
- }
- type SipQuerySession struct {
- Code int `json:"code"`
- Data Sessions `json:"data"`
- }
- type Channel struct {
- ID string `json:"id"`
- IP string `json:"ip"`
- RtmpPort int `json:"rtmp_port"`
- App string `json:"app"`
- Stream string `json:"stream"`
- RtmpUrl string `json:"rtmp_url"`
- Ssrc int `json:"ssrc"`
- RtpPort int `json:"rtp_port"`
- PortMode string `json:"port_mode"`
- RtpPeerPort int `json:"rtp_peer_port"`
- RtpPeerIP string `json:"rtp_peer_ip"`
- RecvTime uint64 `json:"recv_time"`
- RecvTimeStr string `json:"recv_time_str"`
- }
- type Channels struct {
- Channels []Channel `json:"channels"`
- }
- type QueryChannel struct {
- Code int `json:"code"`
- Data Channels `json:"data"`
- }
- type CallbackAPIObject struct {
- Action string `json:"action"`
- Stream string `json:"stream"`
- File string `json:"file"`
- }
- // SipQuerySession 查询SIP会话
- func (o *SrsMgr) SipQuerySession(client *resty.Client) *SipQuerySession {
- resp, err := client.R().Get(mainConfig.GB28181API + "?action=sip_query_session")
- if err != nil {
- logrus.Errorf("查询SIP会话失败:%s", err.Error())
- return nil
- }
- if resp.StatusCode() != http.StatusOK {
- logrus.Errorf("查询SIP会话失败,http状态码:%s", resp.Status())
- return nil
- }
- var sqs SipQuerySession
- if err := json.Unmarshal(resp.Body(), &sqs); err != nil {
- logrus.Errorf("查询SIP会话,解析响应数据失败:%s", err.Error())
- return nil
- }
- return &sqs
- }
- // QueryChannel 查询通道
- func (o *SrsMgr) QueryChannel(client *resty.Client) *QueryChannel {
- resp, err := client.R().Get(mainConfig.GB28181API + "?action=query_channel")
- if err != nil {
- logrus.Errorf("查询通道失败:%s", err.Error())
- return nil
- }
- if resp.StatusCode() != http.StatusOK {
- logrus.Errorf("查询通道失败,http状态码:%s", resp.Status())
- return nil
- }
- var sc QueryChannel
- if err := json.Unmarshal(resp.Body(), &sc); err != nil {
- logrus.Errorf("查询通道,解析响应数据失败:%s", err.Error())
- return nil
- }
- return &sc
- }
- func (o *SrsMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("GB28181Mgr.Handler发生异常:%s", string(debug.Stack()))
- }
- }()
- client := resty.New() //REST Client
- client.SetTimeout(6 * time.Second)
- for {
- time.Sleep(10 * time.Second)
- sqs := o.SipQuerySession(client)
- if sqs == nil {
- continue
- }
- sc := o.QueryChannel(client)
- if sc == nil {
- continue
- }
- mapChannels := make(map[string]bool)
- for _, vc := range sc.Data.Channels {
- mapChannels[vc.ID] = true
- }
- for _, v := range sqs.Data.Sessions {
- for _, v1 := range v.Devices {
- cid := v.ID + "@" + v1.DeviceID
- if _, ok := mapChannels[cid]; !ok { //有sip会话,但未创建多媒体通道的,则需要创建多媒体通道
- queryPara := "?action=create_channel&id=" + cid + "&app=live&stream=[stream]&port_mode=fixed"
- if _, err := client.R().Get(mainConfig.GB28181API + queryPara); err != nil {
- logrus.Errorf("多媒体通道创建失败,id=%s,失败原因:%s", cid, err.Error())
- } else {
- logrus.Infof("多媒体通道创建成功:id=%s", cid)
- }
- }
- }
- }
- }
- }
- // StartWeb 提供api服务
- func (o *SrsMgr) StartWeb(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.StartWeb, args)
- logrus.Errorf("SrsMgr.StartWeb发生异常:%v", err)
- logrus.Errorf("SrsMgr.StartWeb发生异常,堆栈信息:%s", string(debug.Stack()))
- }
- }()
- o.echo.Debug = true
- o.echo.HideBanner = true
- o.echo.Use(middleware.Recover())
- o.echo.Use(middleware.CORS())
- o.echo.Any("/api/*", o.HookHandler) //删除过期的视频调用,谁调用?
- o.echo.HTTPErrorHandler = o.HTTPErrorHandler
- return o.echo.Start("0.0.0.0:8198")
- }
- func (o *SrsMgr) HTTPErrorHandler(err error, c echo.Context) {
- _ = c.JSON(http.StatusInternalServerError, nil)
- }
- type H map[string]interface{}
- func (o *SrsMgr) HookHandler(c echo.Context) error {
- if c.Request().Body != nil {
- buf, err := ioutil.ReadAll(c.Request().Body)
- var obj CallbackAPIObject
- err = json.Unmarshal(buf, &obj)
- if err != nil {
- logrus.Warnf("SrsMgr.HookHandler解析%s失败:%s", string(buf), err.Error())
- } else {
- if obj.Action == "on_dvr" {
- files, _ := WalkDir(mainConfig.SRSLIVEDIR+string(os.PathSeparator)+obj.Stream, ".mp4")
- n := len(files)
- if n > mainConfig.SRSLIVEMP4 {
- sort.Strings(files)
- rmNum := n - mainConfig.SRSLIVEMP4
- for i, v := range files {
- if i < rmNum {
- if err = os.Remove(v); err == nil {
- logrus.Warnf("SrsMgr.HookHandler删除视频文件%s成功", v)
- } else {
- logrus.Warnf("SrsMgr.HookHandler删除视频文件%s失败:%s", v, err.Error())
- }
- } else {
- break
- }
- }
- }
- }
- }
- }
- return c.JSON(http.StatusOK, H{"code": 0})
- }
- func WalkDir(dirPth, suffix string) (files []string, err error) {
- files = make([]string, 0, 30)
- suffix = strings.ToUpper(suffix)
- err = filepath.Walk(dirPth, //遍历
- func(filename string, fi os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if fi.IsDir() {
- return nil
- }
- if strings.HasSuffix(strings.ToUpper(fi.Name()), suffix) {
- files = append(files, filename)
- }
- return nil
- })
- return files, err
- }
|