ipc_device.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package main
  2. import (
  3. "context"
  4. "os"
  5. "runtime/debug"
  6. "strings"
  7. "time"
  8. "github.com/sirupsen/logrus"
  9. "lc/common/mqtt"
  10. "lc/common/onvif/profiles/devicemgmt"
  11. "lc/common/onvif/soap"
  12. "lc/common/protocol"
  13. "lc/common/util"
  14. )
  15. type Resolution struct {
  16. Width int32
  17. Height int32
  18. }
  19. type OnvifState struct {
  20. ChangeTime time.Time //变动时间
  21. State uint8 //是否在线,0在线,1离线
  22. }
  23. func (o *OnvifState) CheckOnline(parent *LcDevice) {
  24. if parent.XAddr == "" {
  25. return
  26. }
  27. o.State = parent.testOnline(parent.XAddr)
  28. o.ChangeTime = util.MlNow()
  29. var obj protocol.Pack_IPCState
  30. if str, err := obj.EnCode(parent.onvifDev.Code, appConfig.GID, GetNextUint64(), o.ChangeTime, o.State); err == nil {
  31. GetMQTTMgr().Publish(GetTopic(parent.GetDevType(), parent.onvifDev.Code, protocol.TP_ONVIF_STATE), str, mqtt.AtMostOnce, ToAll)
  32. }
  33. }
  34. type VideoToken struct {
  35. Token string // "Profile_1"
  36. Name string //码流名称,如"mainStream"
  37. VideoEncoding string //编码格式,如"H264"
  38. VideoResolution Resolution //分辨率
  39. VideoFrameRateLimit int32 //25
  40. VideoBitrateLimit int32 //2048
  41. VideoRtspUrl string //视频地址
  42. SnapshotUrl string //抓拍图片地址
  43. }
  44. type LcDevice struct {
  45. IP string //设备ID,uuid
  46. endpoints map[string]string
  47. tokens map[string]*VideoToken
  48. client *soap.Client
  49. onvifDev *protocol.OnvifDev
  50. ctx context.Context
  51. cancel context.CancelFunc
  52. downQueue *util.MlQueue
  53. mapTopicHandle map[string]func(m mqtt.Message)
  54. chanXAddr chan string
  55. XAddr string
  56. ffmpeg *os.Process //推流进程
  57. observer uint32
  58. EventActuator *EventActuator
  59. State OnvifState //在线监测
  60. }
  61. func NewLcDevice(dev *protocol.OnvifDev) *LcDevice {
  62. client := soap.NewClient(soap.WithTimeout(time.Second * 0))
  63. client.AddHeader(soap.NewWSSSecurityHeader(dev.User, dev.Password))
  64. ctx, cancel := context.WithCancel(context.Background())
  65. lcdev := LcDevice{
  66. IP: dev.IP,
  67. endpoints: make(map[string]string),
  68. tokens: make(map[string]*VideoToken),
  69. client: client,
  70. onvifDev: dev,
  71. ctx: ctx,
  72. cancel: cancel,
  73. downQueue: util.NewQueue(100),
  74. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  75. chanXAddr: make(chan string),
  76. }
  77. lcdev.SetTopicHandle()
  78. lcdev.MQTTSubscribe()
  79. go lcdev.Handle()
  80. return &lcdev
  81. }
  82. func (o *LcDevice) GetDevType() string {
  83. if o.onvifDev.DevType == 3 {
  84. return protocol.DT_IPC
  85. } else if o.onvifDev.DevType == 4 {
  86. return protocol.DT_SOS
  87. }
  88. return "unknown"
  89. }
  90. func (o *LcDevice) SetTopicHandle() {
  91. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ)] = o.HandleTpOnvifPtz
  92. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM)] = o.HandleTpSPtzComm
  93. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT)] = o.HandleTpOnvifSnapshot
  94. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD)] = o.HandleTpOnvifRecord
  95. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO)] = o.HandleTpOnvifVideo
  96. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT)] = o.HandleTpOnvifReboot
  97. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME)] = o.HandleTpOnvifSynctime
  98. }
  99. func (o *LcDevice) MQTTSubscribe() {
  100. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  101. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  102. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  103. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  104. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  105. //以下仅订阅云端
  106. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
  107. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
  108. }
  109. func (o *LcDevice) GetCapabilities(XAddr string) error {
  110. dev := devicemgmt.NewDevice(o.client, XAddr)
  111. reply, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
  112. if err != nil {
  113. if serr, ok := err.(*soap.SOAPFault); ok {
  114. logrus.Error(serr)
  115. }
  116. return err
  117. }
  118. if len(strings.Trim(string(reply.Capabilities.Analytics.XAddr), " ")) > 0 {
  119. o.endpoints["analytics"] = string(reply.Capabilities.Analytics.XAddr)
  120. }
  121. if len(strings.Trim(string(reply.Capabilities.Device.XAddr), " ")) > 0 {
  122. o.endpoints["device"] = string(reply.Capabilities.Device.XAddr)
  123. }
  124. if len(strings.Trim(string(reply.Capabilities.Events.XAddr), " ")) > 0 {
  125. o.endpoints["events"] = string(reply.Capabilities.Events.XAddr)
  126. }
  127. if len(strings.Trim(string(reply.Capabilities.Imaging.XAddr), " ")) > 0 {
  128. o.endpoints["imaging"] = string(reply.Capabilities.Imaging.XAddr)
  129. }
  130. if len(strings.Trim(string(reply.Capabilities.Media.XAddr), " ")) > 0 {
  131. o.endpoints["media"] = string(reply.Capabilities.Media.XAddr)
  132. }
  133. if len(strings.Trim(string(reply.Capabilities.PTZ.XAddr), " ")) > 0 {
  134. o.endpoints["ptz"] = string(reply.Capabilities.PTZ.XAddr)
  135. }
  136. return nil
  137. }
  138. func (o *LcDevice) testOnline(XAddr string) uint8 {
  139. dev := devicemgmt.NewDevice(o.client, XAddr)
  140. _, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
  141. if err != nil {
  142. return protocol.FAILED
  143. }
  144. return protocol.SUCCESS
  145. }
  146. func (o *LcDevice) SystemReboot() (string, error) {
  147. dev := devicemgmt.NewDevice(o.client, o.XAddr)
  148. resp, err := dev.SystemReboot(&devicemgmt.SystemReboot{})
  149. if err != nil {
  150. logrus.Errorf("重启设备[%s]发生错误:%s", o.onvifDev.Code, err.Error())
  151. return "", err
  152. }
  153. return resp.Message, nil
  154. }
  155. func (o *LcDevice) SetSystemDateAndTime() error {
  156. dev := devicemgmt.NewDevice(o.client, o.XAddr)
  157. _, err := dev.SetSystemDateAndTime(&devicemgmt.SetSystemDateAndTime{
  158. DateTimeType: devicemgmt.SetDateTimeTypeNTP,
  159. DaylightSavings: true,
  160. })
  161. if err != nil {
  162. logrus.Errorf("设置设备[%s]日期时间发生错误:%s", o.onvifDev.Code, err.Error())
  163. }
  164. return err
  165. }
  166. func (o *LcDevice) HandleMessage(m mqtt.Message) {
  167. o.downQueue.Put(m)
  168. }
  169. func (o *LcDevice) HandleTpOnvifReboot(m mqtt.Message) {
  170. var obj protocol.Pack_IPCCtrlComm
  171. if err := obj.DeCode(m.PayloadString()); err != nil {
  172. return
  173. }
  174. if o.onvifDev.Code != obj.Id {
  175. return
  176. }
  177. str, err := o.SystemReboot()
  178. if err == nil {
  179. logrus.Infof("设备[%s]重启成功:%s", o.onvifDev.Code, str)
  180. }
  181. var ret protocol.Pack_IPCCommonACK
  182. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
  183. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT_ACK), strRet, mqtt.AtMostOnce, ToAll)
  184. }
  185. }
  186. func (o *LcDevice) HandleTpOnvifSynctime(m mqtt.Message) {
  187. var obj protocol.Pack_IPCCtrlComm
  188. if err := obj.DeCode(m.PayloadString()); err != nil {
  189. return
  190. }
  191. if o.onvifDev.Code != obj.Id {
  192. return
  193. }
  194. err := o.SetSystemDateAndTime()
  195. var ret protocol.Pack_IPCCommonACK
  196. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
  197. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME_ACK), strRet, mqtt.AtMostOnce, ToAll)
  198. }
  199. }
  200. func (o *LcDevice) Update(xaddr string) {
  201. if xaddr != "" {
  202. o.chanXAddr <- xaddr
  203. }
  204. }
  205. func (o *LcDevice) init(xaddr string) error {
  206. //获取服务能力,存入endpoint
  207. err := o.GetCapabilities(xaddr)
  208. if err != nil {
  209. return err
  210. }
  211. //获取profile,存入videotoken
  212. err = o.GetProfiles()
  213. if err != nil {
  214. return err
  215. }
  216. //获取uri,存入videotoken
  217. //err = o.GetAllStreamUri()
  218. //if err != nil {
  219. // return err
  220. //}
  221. //err = o.GetAllSnapshotUri()
  222. //if err != nil {
  223. // return err
  224. //}
  225. if o.EventActuator != nil {
  226. o.EventActuator.Cancel()
  227. if addr, ok := o.endpoints["events"]; ok {
  228. o.EventActuator = NewEventActuator(o)
  229. go o.EventActuator.EventHandle(addr)
  230. }
  231. }
  232. return nil
  233. }
  234. func (o *LcDevice) Handle() {
  235. defer func() {
  236. if err := recover(); err != nil {
  237. logrus.Errorf("LcDevice.Handle发生异常:%v", err)
  238. logrus.Errorf("LcDevice.Handle发生异常,堆栈信息:%s", string(debug.Stack()))
  239. go o.Handle()
  240. }
  241. }()
  242. timer := time.NewTicker(1 * time.Minute)
  243. NeedRetry := true
  244. //初始化地址
  245. o.XAddr = "http://" + o.IP + ":80/onvif/device_service"
  246. if err := o.init(o.XAddr); err == nil {
  247. NeedRetry = false
  248. } else {
  249. logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
  250. }
  251. for {
  252. select {
  253. case <-o.ctx.Done():
  254. o.CleanupProcess()
  255. if o.EventActuator != nil {
  256. o.EventActuator.Cancel()
  257. }
  258. logrus.Errorf("设备[%s]的LcDevice.Handle退出,原因:%v", o.onvifDev.Code, o.ctx.Err())
  259. return
  260. case <-timer.C: //每隔1分钟执行一次
  261. if NeedRetry {
  262. if err := o.init(o.XAddr); err == nil {
  263. NeedRetry = false
  264. } else {
  265. logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
  266. }
  267. }
  268. o.State.CheckOnline(o)
  269. case xaddr := <-o.chanXAddr:
  270. logrus.Infof("摄像机%s尝试更新ip地址为:%s", o.onvifDev.Code, xaddr)
  271. if xaddr != o.XAddr {
  272. o.XAddr = xaddr
  273. if err := o.init(xaddr); err == nil {
  274. NeedRetry = false
  275. logrus.Infof("摄像机%s初始化成功", o.onvifDev.Code)
  276. } else {
  277. logrus.Errorf("摄像机%s初始化失败,原因:%s", o.onvifDev.Code, err.Error())
  278. }
  279. }
  280. default:
  281. if o.EventActuator == nil && len(o.onvifDev.Event) > 0 {
  282. if addr, ok := o.endpoints["events"]; ok {
  283. o.EventActuator = NewEventActuator(o)
  284. go o.EventActuator.EventHandle(addr)
  285. }
  286. }
  287. //从队列钟获取指令执行
  288. if m, ok, _ := o.downQueue.Get(); ok {
  289. if mm, ok := m.(mqtt.Message); ok {
  290. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  291. fn(mm)
  292. } else {
  293. logrus.Errorf("LcDevice.Handle:不支持的主题:%s", mm.Topic())
  294. }
  295. }
  296. } else {
  297. time.Sleep(50 * time.Millisecond)
  298. }
  299. }
  300. }
  301. }
  302. func FileExist(path string) bool {
  303. _, err := os.Lstat(path)
  304. return !os.IsNotExist(err)
  305. }
  306. func GetOnlineMsg() (string, string) {
  307. //发布上线消息
  308. return "", ""
  309. }
  310. func GetWillMsg() (string, string) {
  311. return "", ""
  312. }