package stream import ( "errors" "fmt" "go.uber.org/zap" "net/http" "os" "os/exec" "path/filepath" "server/global" "sync" ) // StreamService 流媒体服务结构体 type StreamService struct { streamMap map[string]*exec.Cmd // 存储活跃的流进程: streamId -> 进程 mutex sync.RWMutex // 保护streamMap的并发安全 hlsBaseDir string // HLS文件存储根目录 } // NewStreamService 创建流媒体服务实例 func NewStreamService() *StreamService { // 初始化HLS存储目录 hlsDir := filepath.Join(global.GVA_CONFIG.Local.StorePath, "hls") if err := os.MkdirAll(hlsDir, 0755); err != nil { global.GVA_LOG.Error("创建HLS目录失败", zap.Error(err)) } return &StreamService{ streamMap: make(map[string]*exec.Cmd), hlsBaseDir: hlsDir, } } // StartStream 启动RTSP转HLS流 func (s *StreamService) StartStream(rtspUrl, streamId string) (string, error) { s.mutex.Lock() defer s.mutex.Unlock() // 检查流是否已在运行 if _, exists := s.streamMap[streamId]; exists { return s.getHLSUrl(streamId), nil } // 创建当前流的存储目录 streamDir := filepath.Join(s.hlsBaseDir, streamId) if err := os.MkdirAll(streamDir, 0755); err != nil { return "", fmt.Errorf("创建流目录失败: %v", err) } // 构建FFmpeg命令: RTSP转HLS outputPath := filepath.Join(streamDir, "stream.m3u8") cmd := exec.Command( "ffmpeg", "-rtsp_transport", "tcp", // 使用TCP传输,更稳定 "-i", rtspUrl, // 输入RTSP地址 "-c:v", "copy", // 视频不重新编码(快速) "-c:a", "aac", // 音频转码为AAC(浏览器兼容) "-f", "hls", // 输出格式为HLS "-hls_time", "2", // 每个切片2秒 "-hls_list_size", "0", // 保留所有切片 "-hls_flags", "delete_segments", // 自动删除旧切片 outputPath, ) // 启动进程 if err := cmd.Start(); err != nil { os.RemoveAll(streamDir) // 启动失败,清理目录 return "", fmt.Errorf("启动FFmpeg失败: %v", err) } // 存储进程引用 s.streamMap[streamId] = cmd global.GVA_LOG.Info("启动流转换成功", zap.String("streamId", streamId)) // 启动goroutine监控进程状态 go s.monitorStream(streamId, cmd, streamDir) return s.getHLSUrl(streamId), nil } // StopStream 停止流转换 func (s *StreamService) StopStream(streamId string) error { s.mutex.Lock() defer s.mutex.Unlock() cmd, exists := s.streamMap[streamId] if !exists { return errors.New("流不存在或已停止") } // 终止进程 if err := cmd.Process.Kill(); err != nil { global.GVA_LOG.Warn("终止流进程失败", zap.String("streamId", streamId), zap.Error(err)) } // 清理资源 delete(s.streamMap, streamId) streamDir := filepath.Join(s.hlsBaseDir, streamId) if err := os.RemoveAll(streamDir); err != nil { global.GVA_LOG.Warn("清理流目录失败", zap.String("streamId", streamId), zap.Error(err)) } global.GVA_LOG.Info("停止流转换成功", zap.String("streamId", streamId)) return nil } // PlayHLS 提供HLS流文件访问 func (s *StreamService) PlayHLS(w http.ResponseWriter, r *http.Request, streamId, filePath string) { // 构建文件路径 filePath = filepath.Join(s.hlsBaseDir, streamId, filePath) // 检查文件是否存在 if _, err := os.Stat(filePath); os.IsNotExist(err) { http.NotFound(w, r) return } // 设置MIME类型 switch filepath.Ext(filePath) { case ".m3u8": w.Header().Set("Content-Type", "application/x-mpegURL") case ".ts": w.Header().Set("Content-Type", "video/MP2T") } // 提供文件下载 http.ServeFile(w, r, filePath) } // GetActiveStreams 获取活跃流列表 func (s *StreamService) GetActiveStreams() []string { s.mutex.RLock() defer s.mutex.RUnlock() list := make([]string, 0, len(s.streamMap)) for streamId := range s.streamMap { list = append(list, streamId) } return list } // 监控流进程状态,异常退出时清理资源 func (s *StreamService) monitorStream(streamId string, cmd *exec.Cmd, streamDir string) { // 等待进程退出 if err := cmd.Wait(); err != nil { global.GVA_LOG.Error("流进程异常退出", zap.String("streamId", streamId), zap.Error(err)) } // 清理资源 s.mutex.Lock() defer s.mutex.Unlock() if _, exists := s.streamMap[streamId]; exists { delete(s.streamMap, streamId) os.RemoveAll(streamDir) } } // 获取HLS流的访问URL func (s *StreamService) getHLSUrl(streamId string) string { return fmt.Sprintf("/%s/stream/hls/%s/stream.m3u8", global.GVA_CONFIG.System.RouterPrefix, streamId) }