stream.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package stream
  2. import (
  3. "errors"
  4. "fmt"
  5. "go.uber.org/zap"
  6. "net/http"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "server/global"
  11. "sync"
  12. )
  13. // StreamService 流媒体服务结构体
  14. type StreamService struct {
  15. streamMap map[string]*exec.Cmd // 存储活跃的流进程: streamId -> 进程
  16. mutex sync.RWMutex // 保护streamMap的并发安全
  17. hlsBaseDir string // HLS文件存储根目录
  18. }
  19. // NewStreamService 创建流媒体服务实例
  20. func NewStreamService() *StreamService {
  21. // 初始化HLS存储目录
  22. hlsDir := filepath.Join(global.GVA_CONFIG.Local.StorePath, "hls")
  23. if err := os.MkdirAll(hlsDir, 0755); err != nil {
  24. global.GVA_LOG.Error("创建HLS目录失败", zap.Error(err))
  25. }
  26. return &StreamService{
  27. streamMap: make(map[string]*exec.Cmd),
  28. hlsBaseDir: hlsDir,
  29. }
  30. }
  31. // StartStream 启动RTSP转HLS流
  32. func (s *StreamService) StartStream(rtspUrl, streamId string) (string, error) {
  33. s.mutex.Lock()
  34. defer s.mutex.Unlock()
  35. // 检查流是否已在运行
  36. if _, exists := s.streamMap[streamId]; exists {
  37. return s.getHLSUrl(streamId), nil
  38. }
  39. // 创建当前流的存储目录
  40. streamDir := filepath.Join(s.hlsBaseDir, streamId)
  41. if err := os.MkdirAll(streamDir, 0755); err != nil {
  42. return "", fmt.Errorf("创建流目录失败: %v", err)
  43. }
  44. // 构建FFmpeg命令: RTSP转HLS
  45. outputPath := filepath.Join(streamDir, "stream.m3u8")
  46. cmd := exec.Command(
  47. "ffmpeg",
  48. "-rtsp_transport", "tcp", // 使用TCP传输,更稳定
  49. "-i", rtspUrl, // 输入RTSP地址
  50. "-c:v", "copy", // 视频不重新编码(快速)
  51. "-c:a", "aac", // 音频转码为AAC(浏览器兼容)
  52. "-f", "hls", // 输出格式为HLS
  53. "-hls_time", "2", // 每个切片2秒
  54. "-hls_list_size", "0", // 保留所有切片
  55. "-hls_flags", "delete_segments", // 自动删除旧切片
  56. outputPath,
  57. )
  58. // 启动进程
  59. if err := cmd.Start(); err != nil {
  60. os.RemoveAll(streamDir) // 启动失败,清理目录
  61. return "", fmt.Errorf("启动FFmpeg失败: %v", err)
  62. }
  63. // 存储进程引用
  64. s.streamMap[streamId] = cmd
  65. global.GVA_LOG.Info("启动流转换成功", zap.String("streamId", streamId))
  66. // 启动goroutine监控进程状态
  67. go s.monitorStream(streamId, cmd, streamDir)
  68. return s.getHLSUrl(streamId), nil
  69. }
  70. // StopStream 停止流转换
  71. func (s *StreamService) StopStream(streamId string) error {
  72. s.mutex.Lock()
  73. defer s.mutex.Unlock()
  74. cmd, exists := s.streamMap[streamId]
  75. if !exists {
  76. return errors.New("流不存在或已停止")
  77. }
  78. // 终止进程
  79. if err := cmd.Process.Kill(); err != nil {
  80. global.GVA_LOG.Warn("终止流进程失败", zap.String("streamId", streamId), zap.Error(err))
  81. }
  82. // 清理资源
  83. delete(s.streamMap, streamId)
  84. streamDir := filepath.Join(s.hlsBaseDir, streamId)
  85. if err := os.RemoveAll(streamDir); err != nil {
  86. global.GVA_LOG.Warn("清理流目录失败", zap.String("streamId", streamId), zap.Error(err))
  87. }
  88. global.GVA_LOG.Info("停止流转换成功", zap.String("streamId", streamId))
  89. return nil
  90. }
  91. // PlayHLS 提供HLS流文件访问
  92. func (s *StreamService) PlayHLS(w http.ResponseWriter, r *http.Request, streamId, filePath string) {
  93. // 构建文件路径
  94. filePath = filepath.Join(s.hlsBaseDir, streamId, filePath)
  95. // 检查文件是否存在
  96. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  97. http.NotFound(w, r)
  98. return
  99. }
  100. // 设置MIME类型
  101. switch filepath.Ext(filePath) {
  102. case ".m3u8":
  103. w.Header().Set("Content-Type", "application/x-mpegURL")
  104. case ".ts":
  105. w.Header().Set("Content-Type", "video/MP2T")
  106. }
  107. // 提供文件下载
  108. http.ServeFile(w, r, filePath)
  109. }
  110. // GetActiveStreams 获取活跃流列表
  111. func (s *StreamService) GetActiveStreams() []string {
  112. s.mutex.RLock()
  113. defer s.mutex.RUnlock()
  114. list := make([]string, 0, len(s.streamMap))
  115. for streamId := range s.streamMap {
  116. list = append(list, streamId)
  117. }
  118. return list
  119. }
  120. // 监控流进程状态,异常退出时清理资源
  121. func (s *StreamService) monitorStream(streamId string, cmd *exec.Cmd, streamDir string) {
  122. // 等待进程退出
  123. if err := cmd.Wait(); err != nil {
  124. global.GVA_LOG.Error("流进程异常退出", zap.String("streamId", streamId), zap.Error(err))
  125. }
  126. // 清理资源
  127. s.mutex.Lock()
  128. defer s.mutex.Unlock()
  129. if _, exists := s.streamMap[streamId]; exists {
  130. delete(s.streamMap, streamId)
  131. os.RemoveAll(streamDir)
  132. }
  133. }
  134. // 获取HLS流的访问URL
  135. func (s *StreamService) getHLSUrl(streamId string) string {
  136. return fmt.Sprintf("/%s/stream/hls/%s/stream.m3u8",
  137. global.GVA_CONFIG.System.RouterPrefix,
  138. streamId)
  139. }