| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- package main
- import (
- "context"
- "os"
- "runtime/debug"
- "strings"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/mqtt"
- "lc/common/onvif/profiles/devicemgmt"
- "lc/common/onvif/soap"
- "lc/common/protocol"
- "lc/common/util"
- )
- type Resolution struct {
- Width int32
- Height int32
- }
- type OnvifState struct {
- ChangeTime time.Time //变动时间
- State uint8 //是否在线,0在线,1离线
- }
- func (o *OnvifState) CheckOnline(parent *LcDevice) {
- if parent.XAddr == "" {
- return
- }
- o.State = parent.testOnline(parent.XAddr)
- o.ChangeTime = util.MlNow()
- var obj protocol.Pack_IPCState
- if str, err := obj.EnCode(parent.onvifDev.Code, appConfig.GID, GetNextUint64(), o.ChangeTime, o.State); err == nil {
- GetMQTTMgr().Publish(GetTopic(parent.GetDevType(), parent.onvifDev.Code, protocol.TP_ONVIF_STATE), str, mqtt.AtMostOnce, ToAll)
- }
- }
- type VideoToken struct {
- Token string // "Profile_1"
- Name string //码流名称,如"mainStream"
- VideoEncoding string //编码格式,如"H264"
- VideoResolution Resolution //分辨率
- VideoFrameRateLimit int32 //25
- VideoBitrateLimit int32 //2048
- VideoRtspUrl string //视频地址
- SnapshotUrl string //抓拍图片地址
- }
- type LcDevice struct {
- IP string //设备ID,uuid
- endpoints map[string]string
- tokens map[string]*VideoToken
- client *soap.Client
- onvifDev *protocol.OnvifDev
- ctx context.Context
- cancel context.CancelFunc
- downQueue *util.MlQueue
- mapTopicHandle map[string]func(m mqtt.Message)
- chanXAddr chan string
- XAddr string
- ffmpeg *os.Process //推流进程
- observer uint32
- EventActuator *EventActuator
- State OnvifState //在线监测
- }
- func NewLcDevice(dev *protocol.OnvifDev) *LcDevice {
- client := soap.NewClient(soap.WithTimeout(time.Second * 0))
- client.AddHeader(soap.NewWSSSecurityHeader(dev.User, dev.Password))
- ctx, cancel := context.WithCancel(context.Background())
- lcdev := LcDevice{
- IP: dev.IP,
- endpoints: make(map[string]string),
- tokens: make(map[string]*VideoToken),
- client: client,
- onvifDev: dev,
- ctx: ctx,
- cancel: cancel,
- downQueue: util.NewQueue(100),
- mapTopicHandle: make(map[string]func(m mqtt.Message)),
- chanXAddr: make(chan string),
- }
- lcdev.SetTopicHandle()
- lcdev.MQTTSubscribe()
- go lcdev.Handle()
- return &lcdev
- }
- func (o *LcDevice) GetDevType() string {
- if o.onvifDev.DevType == 3 {
- return protocol.DT_IPC
- } else if o.onvifDev.DevType == 4 {
- return protocol.DT_SOS
- }
- return "unknown"
- }
- func (o *LcDevice) SetTopicHandle() {
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ)] = o.HandleTpOnvifPtz
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM)] = o.HandleTpSPtzComm
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT)] = o.HandleTpOnvifSnapshot
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD)] = o.HandleTpOnvifRecord
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO)] = o.HandleTpOnvifVideo
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT)] = o.HandleTpOnvifReboot
- o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME)] = o.HandleTpOnvifSynctime
- }
- func (o *LcDevice) MQTTSubscribe() {
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- //以下仅订阅云端
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
- GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
- }
- func (o *LcDevice) GetCapabilities(XAddr string) error {
- dev := devicemgmt.NewDevice(o.client, XAddr)
- reply, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
- if err != nil {
- if serr, ok := err.(*soap.SOAPFault); ok {
- logrus.Error(serr)
- }
- return err
- }
- if len(strings.Trim(string(reply.Capabilities.Analytics.XAddr), " ")) > 0 {
- o.endpoints["analytics"] = string(reply.Capabilities.Analytics.XAddr)
- }
- if len(strings.Trim(string(reply.Capabilities.Device.XAddr), " ")) > 0 {
- o.endpoints["device"] = string(reply.Capabilities.Device.XAddr)
- }
- if len(strings.Trim(string(reply.Capabilities.Events.XAddr), " ")) > 0 {
- o.endpoints["events"] = string(reply.Capabilities.Events.XAddr)
- }
- if len(strings.Trim(string(reply.Capabilities.Imaging.XAddr), " ")) > 0 {
- o.endpoints["imaging"] = string(reply.Capabilities.Imaging.XAddr)
- }
- if len(strings.Trim(string(reply.Capabilities.Media.XAddr), " ")) > 0 {
- o.endpoints["media"] = string(reply.Capabilities.Media.XAddr)
- }
- if len(strings.Trim(string(reply.Capabilities.PTZ.XAddr), " ")) > 0 {
- o.endpoints["ptz"] = string(reply.Capabilities.PTZ.XAddr)
- }
- return nil
- }
- func (o *LcDevice) testOnline(XAddr string) uint8 {
- dev := devicemgmt.NewDevice(o.client, XAddr)
- _, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
- if err != nil {
- return protocol.FAILED
- }
- return protocol.SUCCESS
- }
- func (o *LcDevice) SystemReboot() (string, error) {
- dev := devicemgmt.NewDevice(o.client, o.XAddr)
- resp, err := dev.SystemReboot(&devicemgmt.SystemReboot{})
- if err != nil {
- logrus.Errorf("重启设备[%s]发生错误:%s", o.onvifDev.Code, err.Error())
- return "", err
- }
- return resp.Message, nil
- }
- func (o *LcDevice) SetSystemDateAndTime() error {
- dev := devicemgmt.NewDevice(o.client, o.XAddr)
- _, err := dev.SetSystemDateAndTime(&devicemgmt.SetSystemDateAndTime{
- DateTimeType: devicemgmt.SetDateTimeTypeNTP,
- DaylightSavings: true,
- })
- if err != nil {
- logrus.Errorf("设置设备[%s]日期时间发生错误:%s", o.onvifDev.Code, err.Error())
- }
- return err
- }
- func (o *LcDevice) HandleMessage(m mqtt.Message) {
- o.downQueue.Put(m)
- }
- func (o *LcDevice) HandleTpOnvifReboot(m mqtt.Message) {
- var obj protocol.Pack_IPCCtrlComm
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- if o.onvifDev.Code != obj.Id {
- return
- }
- str, err := o.SystemReboot()
- if err == nil {
- logrus.Infof("设备[%s]重启成功:%s", o.onvifDev.Code, str)
- }
- var ret protocol.Pack_IPCCommonACK
- if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
- GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT_ACK), strRet, mqtt.AtMostOnce, ToAll)
- }
- }
- func (o *LcDevice) HandleTpOnvifSynctime(m mqtt.Message) {
- var obj protocol.Pack_IPCCtrlComm
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- if o.onvifDev.Code != obj.Id {
- return
- }
- err := o.SetSystemDateAndTime()
- var ret protocol.Pack_IPCCommonACK
- if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
- GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME_ACK), strRet, mqtt.AtMostOnce, ToAll)
- }
- }
- func (o *LcDevice) Update(xaddr string) {
- if xaddr != "" {
- o.chanXAddr <- xaddr
- }
- }
- func (o *LcDevice) init(xaddr string) error {
- //获取服务能力,存入endpoint
- err := o.GetCapabilities(xaddr)
- if err != nil {
- return err
- }
- //获取profile,存入videotoken
- err = o.GetProfiles()
- if err != nil {
- return err
- }
- //获取uri,存入videotoken
- //err = o.GetAllStreamUri()
- //if err != nil {
- // return err
- //}
- //err = o.GetAllSnapshotUri()
- //if err != nil {
- // return err
- //}
- if o.EventActuator != nil {
- o.EventActuator.Cancel()
- if addr, ok := o.endpoints["events"]; ok {
- o.EventActuator = NewEventActuator(o)
- go o.EventActuator.EventHandle(addr)
- }
- }
- return nil
- }
- func (o *LcDevice) Handle() {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("LcDevice.Handle发生异常:%v", err)
- logrus.Errorf("LcDevice.Handle发生异常,堆栈信息:%s", string(debug.Stack()))
- go o.Handle()
- }
- }()
- timer := time.NewTicker(1 * time.Minute)
- NeedRetry := true
- //初始化地址
- o.XAddr = "http://" + o.IP + ":80/onvif/device_service"
- if err := o.init(o.XAddr); err == nil {
- NeedRetry = false
- } else {
- logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
- }
- for {
- select {
- case <-o.ctx.Done():
- o.CleanupProcess()
- if o.EventActuator != nil {
- o.EventActuator.Cancel()
- }
- logrus.Errorf("设备[%s]的LcDevice.Handle退出,原因:%v", o.onvifDev.Code, o.ctx.Err())
- return
- case <-timer.C: //每隔1分钟执行一次
- if NeedRetry {
- if err := o.init(o.XAddr); err == nil {
- NeedRetry = false
- } else {
- logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
- }
- }
- o.State.CheckOnline(o)
- case xaddr := <-o.chanXAddr:
- logrus.Infof("摄像机%s尝试更新ip地址为:%s", o.onvifDev.Code, xaddr)
- if xaddr != o.XAddr {
- o.XAddr = xaddr
- if err := o.init(xaddr); err == nil {
- NeedRetry = false
- logrus.Infof("摄像机%s初始化成功", o.onvifDev.Code)
- } else {
- logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
- }
- }
- default:
- if o.EventActuator == nil && len(o.onvifDev.Event) > 0 {
- if addr, ok := o.endpoints["events"]; ok {
- o.EventActuator = NewEventActuator(o)
- go o.EventActuator.EventHandle(addr)
- }
- }
- //从队列钟获取指令执行
- if m, ok, _ := o.downQueue.Get(); ok {
- if mm, ok := m.(mqtt.Message); ok {
- if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
- fn(mm)
- } else {
- logrus.Errorf("LcDevice.Handle:不支持的主题:%s", mm.Topic())
- }
- }
- } else {
- time.Sleep(50 * time.Millisecond)
- }
- }
- }
- }
- func FileExist(path string) bool {
- _, err := os.Lstat(path)
- return !os.IsNotExist(err)
- }
- func GetOnlineMsg() (string, string) {
- //发布上线消息
- return "", ""
- }
- func GetWillMsg() (string, string) {
- return "", ""
- }
|