| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package stream
- import (
- "errors"
- "fmt"
- "go.uber.org/zap"
- "net/http"
- "os"
- "os/exec"
- "path/filepath"
- "server/global"
- "sync"
- )
- // StreamService 流媒体服务结构体
- type StreamService struct {
- streamMap map[string]*exec.Cmd // 存储活跃的流进程: streamId -> 进程
- mutex sync.RWMutex // 保护streamMap的并发安全
- hlsBaseDir string // HLS文件存储根目录
- }
- // NewStreamService 创建流媒体服务实例
- func NewStreamService() *StreamService {
- // 初始化HLS存储目录
- hlsDir := filepath.Join(global.GVA_CONFIG.Local.StorePath, "hls")
- if err := os.MkdirAll(hlsDir, 0755); err != nil {
- global.GVA_LOG.Error("创建HLS目录失败", zap.Error(err))
- }
- return &StreamService{
- streamMap: make(map[string]*exec.Cmd),
- hlsBaseDir: hlsDir,
- }
- }
- // StartStream 启动RTSP转HLS流
- func (s *StreamService) StartStream(rtspUrl, streamId string) (string, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- // 检查流是否已在运行
- if _, exists := s.streamMap[streamId]; exists {
- return s.getHLSUrl(streamId), nil
- }
- // 创建当前流的存储目录
- streamDir := filepath.Join(s.hlsBaseDir, streamId)
- if err := os.MkdirAll(streamDir, 0755); err != nil {
- return "", fmt.Errorf("创建流目录失败: %v", err)
- }
- // 构建FFmpeg命令: RTSP转HLS
- outputPath := filepath.Join(streamDir, "stream.m3u8")
- cmd := exec.Command(
- "ffmpeg",
- "-rtsp_transport", "tcp", // 使用TCP传输,更稳定
- "-i", rtspUrl, // 输入RTSP地址
- "-c:v", "copy", // 视频不重新编码(快速)
- "-c:a", "aac", // 音频转码为AAC(浏览器兼容)
- "-f", "hls", // 输出格式为HLS
- "-hls_time", "2", // 每个切片2秒
- "-hls_list_size", "0", // 保留所有切片
- "-hls_flags", "delete_segments", // 自动删除旧切片
- outputPath,
- )
- // 启动进程
- if err := cmd.Start(); err != nil {
- os.RemoveAll(streamDir) // 启动失败,清理目录
- return "", fmt.Errorf("启动FFmpeg失败: %v", err)
- }
- // 存储进程引用
- s.streamMap[streamId] = cmd
- global.GVA_LOG.Info("启动流转换成功", zap.String("streamId", streamId))
- // 启动goroutine监控进程状态
- go s.monitorStream(streamId, cmd, streamDir)
- return s.getHLSUrl(streamId), nil
- }
- // StopStream 停止流转换
- func (s *StreamService) StopStream(streamId string) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- cmd, exists := s.streamMap[streamId]
- if !exists {
- return errors.New("流不存在或已停止")
- }
- // 终止进程
- if err := cmd.Process.Kill(); err != nil {
- global.GVA_LOG.Warn("终止流进程失败", zap.String("streamId", streamId), zap.Error(err))
- }
- // 清理资源
- delete(s.streamMap, streamId)
- streamDir := filepath.Join(s.hlsBaseDir, streamId)
- if err := os.RemoveAll(streamDir); err != nil {
- global.GVA_LOG.Warn("清理流目录失败", zap.String("streamId", streamId), zap.Error(err))
- }
- global.GVA_LOG.Info("停止流转换成功", zap.String("streamId", streamId))
- return nil
- }
- // PlayHLS 提供HLS流文件访问
- func (s *StreamService) PlayHLS(w http.ResponseWriter, r *http.Request, streamId, filePath string) {
- // 构建文件路径
- filePath = filepath.Join(s.hlsBaseDir, streamId, filePath)
- // 检查文件是否存在
- if _, err := os.Stat(filePath); os.IsNotExist(err) {
- http.NotFound(w, r)
- return
- }
- // 设置MIME类型
- switch filepath.Ext(filePath) {
- case ".m3u8":
- w.Header().Set("Content-Type", "application/x-mpegURL")
- case ".ts":
- w.Header().Set("Content-Type", "video/MP2T")
- }
- // 提供文件下载
- http.ServeFile(w, r, filePath)
- }
- // GetActiveStreams 获取活跃流列表
- func (s *StreamService) GetActiveStreams() []string {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- list := make([]string, 0, len(s.streamMap))
- for streamId := range s.streamMap {
- list = append(list, streamId)
- }
- return list
- }
- // 监控流进程状态,异常退出时清理资源
- func (s *StreamService) monitorStream(streamId string, cmd *exec.Cmd, streamDir string) {
- // 等待进程退出
- if err := cmd.Wait(); err != nil {
- global.GVA_LOG.Error("流进程异常退出", zap.String("streamId", streamId), zap.Error(err))
- }
- // 清理资源
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if _, exists := s.streamMap[streamId]; exists {
- delete(s.streamMap, streamId)
- os.RemoveAll(streamDir)
- }
- }
- // 获取HLS流的访问URL
- func (s *StreamService) getHLSUrl(streamId string) string {
- return fmt.Sprintf("/%s/stream/hls/%s/stream.m3u8",
- global.GVA_CONFIG.System.RouterPrefix,
- streamId)
- }
|