package main import ( "io/ioutil" "net/http" "os" "path/filepath" "runtime/debug" "sort" "strings" "sync" "time" "github.com/labstack/echo/v4/middleware" "github.com/go-resty/resty/v2" "github.com/labstack/echo/v4" "github.com/sirupsen/logrus" ) var _onceSrsMgr sync.Once var _singleSrsMgr *SrsMgr func GetSrsMgr() *SrsMgr { _onceSrsMgr.Do(func() { _singleSrsMgr = &SrsMgr{ echo: echo.New(), } }) return _singleSrsMgr } type SrsMgr struct { echo *echo.Echo } type Devices struct { DeviceID string `json:"device_id"` DeviceStatus string `json:"device_status"` InviteStatus string `json:"invite_status"` InviteTime int64 `json:"invite_time"` } type Session struct { ID string `json:"id"` DeviceSum int `json:"device_sumnum"` Devices []Devices `json:"devices"` } type Sessions struct { Sessions []Session `json:"sessions"` } type SipQuerySession struct { Code int `json:"code"` Data Sessions `json:"data"` } type Channel struct { ID string `json:"id"` IP string `json:"ip"` RtmpPort int `json:"rtmp_port"` App string `json:"app"` Stream string `json:"stream"` RtmpUrl string `json:"rtmp_url"` Ssrc int `json:"ssrc"` RtpPort int `json:"rtp_port"` PortMode string `json:"port_mode"` RtpPeerPort int `json:"rtp_peer_port"` RtpPeerIP string `json:"rtp_peer_ip"` RecvTime uint64 `json:"recv_time"` RecvTimeStr string `json:"recv_time_str"` } type Channels struct { Channels []Channel `json:"channels"` } type QueryChannel struct { Code int `json:"code"` Data Channels `json:"data"` } type CallbackAPIObject struct { Action string `json:"action"` Stream string `json:"stream"` File string `json:"file"` } // SipQuerySession 查询SIP会话 func (o *SrsMgr) SipQuerySession(client *resty.Client) *SipQuerySession { resp, err := client.R().Get(mainConfig.GB28181API + "?action=sip_query_session") if err != nil { logrus.Errorf("查询SIP会话失败:%s", err.Error()) return nil } if resp.StatusCode() != http.StatusOK { logrus.Errorf("查询SIP会话失败,http状态码:%s", resp.Status()) return nil } var sqs SipQuerySession if err := json.Unmarshal(resp.Body(), &sqs); err != nil { logrus.Errorf("查询SIP会话,解析响应数据失败:%s", err.Error()) return nil } return &sqs } // QueryChannel 查询通道 func (o *SrsMgr) QueryChannel(client *resty.Client) *QueryChannel { resp, err := client.R().Get(mainConfig.GB28181API + "?action=query_channel") if err != nil { logrus.Errorf("查询通道失败:%s", err.Error()) return nil } if resp.StatusCode() != http.StatusOK { logrus.Errorf("查询通道失败,http状态码:%s", resp.Status()) return nil } var sc QueryChannel if err := json.Unmarshal(resp.Body(), &sc); err != nil { logrus.Errorf("查询通道,解析响应数据失败:%s", err.Error()) return nil } return &sc } func (o *SrsMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("GB28181Mgr.Handler发生异常:%s", string(debug.Stack())) } }() client := resty.New() //REST Client client.SetTimeout(6 * time.Second) for { time.Sleep(10 * time.Second) sqs := o.SipQuerySession(client) if sqs == nil { continue } sc := o.QueryChannel(client) if sc == nil { continue } mapChannels := make(map[string]bool) for _, vc := range sc.Data.Channels { mapChannels[vc.ID] = true } for _, v := range sqs.Data.Sessions { for _, v1 := range v.Devices { cid := v.ID + "@" + v1.DeviceID if _, ok := mapChannels[cid]; !ok { //有sip会话,但未创建多媒体通道的,则需要创建多媒体通道 queryPara := "?action=create_channel&id=" + cid + "&app=live&stream=[stream]&port_mode=fixed" if _, err := client.R().Get(mainConfig.GB28181API + queryPara); err != nil { logrus.Errorf("多媒体通道创建失败,id=%s,失败原因:%s", cid, err.Error()) } else { logrus.Infof("多媒体通道创建成功:id=%s", cid) } } } } } } // StartWeb 提供api服务 func (o *SrsMgr) StartWeb(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.StartWeb, args) logrus.Errorf("SrsMgr.StartWeb发生异常:%v", err) logrus.Errorf("SrsMgr.StartWeb发生异常,堆栈信息:%s", string(debug.Stack())) } }() o.echo.Debug = true o.echo.HideBanner = true o.echo.Use(middleware.Recover()) o.echo.Use(middleware.CORS()) o.echo.Any("/api/*", o.HookHandler) //删除过期的视频调用,谁调用? o.echo.HTTPErrorHandler = o.HTTPErrorHandler return o.echo.Start("0.0.0.0:8198") } func (o *SrsMgr) HTTPErrorHandler(err error, c echo.Context) { _ = c.JSON(http.StatusInternalServerError, nil) } type H map[string]interface{} func (o *SrsMgr) HookHandler(c echo.Context) error { if c.Request().Body != nil { buf, err := ioutil.ReadAll(c.Request().Body) var obj CallbackAPIObject err = json.Unmarshal(buf, &obj) if err != nil { logrus.Warnf("SrsMgr.HookHandler解析%s失败:%s", string(buf), err.Error()) } else { if obj.Action == "on_dvr" { files, _ := WalkDir(mainConfig.SRSLIVEDIR+string(os.PathSeparator)+obj.Stream, ".mp4") n := len(files) if n > mainConfig.SRSLIVEMP4 { sort.Strings(files) rmNum := n - mainConfig.SRSLIVEMP4 for i, v := range files { if i < rmNum { if err = os.Remove(v); err == nil { logrus.Warnf("SrsMgr.HookHandler删除视频文件%s成功", v) } else { logrus.Warnf("SrsMgr.HookHandler删除视频文件%s失败:%s", v, err.Error()) } } else { break } } } } } } return c.JSON(http.StatusOK, H{"code": 0}) } func WalkDir(dirPth, suffix string) (files []string, err error) { files = make([]string, 0, 30) suffix = strings.ToUpper(suffix) err = filepath.Walk(dirPth, //遍历 func(filename string, fi os.FileInfo, err error) error { if err != nil { return err } if fi.IsDir() { return nil } if strings.HasSuffix(strings.ToUpper(fi.Name()), suffix) { files = append(files, filename) } return nil }) return files, err }