ipc_device.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package main
  2. import (
  3. "context"
  4. "os"
  5. "strings"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/mqtt"
  9. "lc/common/onvif/profiles/devicemgmt"
  10. "lc/common/onvif/soap"
  11. "lc/common/protocol"
  12. "lc/common/util"
  13. )
  14. type Resolution struct {
  15. Width int32
  16. Height int32
  17. }
  18. type OnvifState struct {
  19. ChangeTime time.Time //变动时间
  20. State uint8 //是否在线,0在线,1离线
  21. }
  22. func (o *OnvifState) CheckOnline(parent *LcDevice) {
  23. if parent.XAddr == "" {
  24. return
  25. }
  26. o.State = parent.testOnline(parent.XAddr)
  27. o.ChangeTime = util.MlNow()
  28. var obj protocol.Pack_IPCState
  29. if str, err := obj.EnCode(parent.onvifDev.Code, appConfig.GID, GetNextUint64(), o.ChangeTime, o.State); err == nil {
  30. GetMQTTMgr().Publish(GetTopic(parent.GetDevType(), parent.onvifDev.Code, protocol.TP_ONVIF_STATE), str, mqtt.AtMostOnce, ToAll)
  31. }
  32. }
  33. type VideoToken struct {
  34. Token string // "Profile_1"
  35. Name string //码流名称,如"mainStream"
  36. VideoEncoding string //编码格式,如"H264"
  37. VideoResolution Resolution //分辨率
  38. VideoFrameRateLimit int32 //25
  39. VideoBitrateLimit int32 //2048
  40. VideoRtspUrl string //视频地址
  41. SnapshotUrl string //抓拍图片地址
  42. }
  43. type LcDevice struct {
  44. IP string //设备ID,uuid
  45. endpoints map[string]string
  46. tokens map[string]*VideoToken
  47. client *soap.Client
  48. onvifDev *protocol.OnvifDev
  49. ctx context.Context
  50. cancel context.CancelFunc
  51. downQueue *util.MlQueue
  52. mapTopicHandle map[string]func(m mqtt.Message)
  53. chanXAddr chan string
  54. XAddr string
  55. ffmpeg *os.Process //推流进程
  56. observer uint32
  57. EventActuator *EventActuator
  58. State OnvifState //在线监测
  59. }
  60. func NewLcDevice(dev *protocol.OnvifDev) *LcDevice {
  61. client := soap.NewClient(soap.WithTimeout(time.Second * 0))
  62. client.AddHeader(soap.NewWSSSecurityHeader(dev.User, dev.Password))
  63. ctx, cancel := context.WithCancel(context.Background())
  64. lcDevice := LcDevice{
  65. IP: dev.IP,
  66. endpoints: make(map[string]string),
  67. tokens: make(map[string]*VideoToken),
  68. client: client,
  69. onvifDev: dev,
  70. ctx: ctx,
  71. cancel: cancel,
  72. downQueue: util.NewQueue(100),
  73. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  74. chanXAddr: make(chan string),
  75. }
  76. lcDevice.SetTopicHandle()
  77. lcDevice.MQTTSubscribe()
  78. go lcDevice.Handle()
  79. return &lcDevice
  80. }
  81. func (o *LcDevice) GetDevType() string {
  82. if o.onvifDev.DevType == 3 {
  83. return protocol.DT_IPC
  84. } else if o.onvifDev.DevType == 4 {
  85. return protocol.DT_SOS
  86. }
  87. return "unknown"
  88. }
  89. func (o *LcDevice) SetTopicHandle() {
  90. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ)] = o.HandleTpOnvifPtz
  91. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM)] = o.HandleTpSPtzComm
  92. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT)] = o.HandleTpOnvifSnapshot
  93. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD)] = o.HandleTpOnvifRecord
  94. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO)] = o.HandleTpOnvifVideo
  95. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT)] = o.HandleTpOnvifReboot
  96. o.mapTopicHandle[GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME)] = o.HandleTpOnvifSynctime
  97. }
  98. func (o *LcDevice) MQTTSubscribe() {
  99. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  100. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_PTZ_COMM), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  101. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SNAPSHOT), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  102. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_RECORD), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  103. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_VIDEO), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  104. //以下仅订阅云端
  105. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
  106. GetMQTTMgr().Subscribe(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME), mqtt.AtMostOnce, o.HandleMessage, ToCloud)
  107. }
  108. func (o *LcDevice) GetCapabilities(XAddr string) error {
  109. dev := devicemgmt.NewDevice(o.client, XAddr)
  110. reply, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
  111. if err != nil {
  112. if soapFault, ok := err.(*soap.SOAPFault); ok {
  113. logrus.Error(soapFault)
  114. }
  115. return err
  116. }
  117. if len(strings.Trim(string(reply.Capabilities.Analytics.XAddr), " ")) > 0 {
  118. o.endpoints["analytics"] = string(reply.Capabilities.Analytics.XAddr)
  119. }
  120. if len(strings.Trim(string(reply.Capabilities.Device.XAddr), " ")) > 0 {
  121. o.endpoints["device"] = string(reply.Capabilities.Device.XAddr)
  122. }
  123. if len(strings.Trim(string(reply.Capabilities.Events.XAddr), " ")) > 0 {
  124. o.endpoints["events"] = string(reply.Capabilities.Events.XAddr)
  125. }
  126. if len(strings.Trim(string(reply.Capabilities.Imaging.XAddr), " ")) > 0 {
  127. o.endpoints["imaging"] = string(reply.Capabilities.Imaging.XAddr)
  128. }
  129. if len(strings.Trim(string(reply.Capabilities.Media.XAddr), " ")) > 0 {
  130. o.endpoints["media"] = string(reply.Capabilities.Media.XAddr)
  131. }
  132. if len(strings.Trim(string(reply.Capabilities.PTZ.XAddr), " ")) > 0 {
  133. o.endpoints["ptz"] = string(reply.Capabilities.PTZ.XAddr)
  134. }
  135. return nil
  136. }
  137. func (o *LcDevice) testOnline(XAddr string) uint8 {
  138. dev := devicemgmt.NewDevice(o.client, XAddr)
  139. _, err := dev.GetCapabilities(&devicemgmt.GetCapabilities{})
  140. if err != nil {
  141. return protocol.FAILED
  142. }
  143. return protocol.SUCCESS
  144. }
  145. func (o *LcDevice) SystemReboot() (string, error) {
  146. dev := devicemgmt.NewDevice(o.client, o.XAddr)
  147. resp, err := dev.SystemReboot(&devicemgmt.SystemReboot{})
  148. if err != nil {
  149. return "", err
  150. }
  151. return resp.Message, nil
  152. }
  153. func (o *LcDevice) SetSystemDateAndTime() error {
  154. dev := devicemgmt.NewDevice(o.client, o.XAddr)
  155. dev.SetSystemDateAndTime(&devicemgmt.SetSystemDateAndTime{
  156. DateTimeType: devicemgmt.SetDateTimeTypeNTP,
  157. DaylightSavings: true,
  158. })
  159. return nil
  160. }
  161. func (o *LcDevice) HandleMessage(m mqtt.Message) {
  162. o.downQueue.Put(m)
  163. }
  164. func (o *LcDevice) HandleTpOnvifReboot(m mqtt.Message) {
  165. var obj protocol.Pack_IPCCtrlComm
  166. if err := obj.DeCode(m.PayloadString()); err != nil {
  167. return
  168. }
  169. if o.onvifDev.Code != obj.Id {
  170. return
  171. }
  172. _, err := o.SystemReboot()
  173. var ret protocol.Pack_IPCCommonACK
  174. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
  175. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_REBOOT_ACK), strRet, mqtt.AtMostOnce, ToAll)
  176. }
  177. }
  178. func (o *LcDevice) HandleTpOnvifSynctime(m mqtt.Message) {
  179. var obj protocol.Pack_IPCCtrlComm
  180. if err := obj.DeCode(m.PayloadString()); err != nil {
  181. return
  182. }
  183. if o.onvifDev.Code != obj.Id {
  184. return
  185. }
  186. err := o.SetSystemDateAndTime()
  187. var ret protocol.Pack_IPCCommonACK
  188. if strRet, err := ret.EnCode(o.onvifDev.Code, appConfig.GID, "", obj.Seq, err); err == nil {
  189. GetMQTTMgr().Publish(GetTopic(o.GetDevType(), o.onvifDev.Code, protocol.TP_ONVIF_SYNCTIME_ACK), strRet, mqtt.AtMostOnce, ToAll)
  190. }
  191. }
  192. func (o *LcDevice) Update(addr string) {
  193. if addr != "" {
  194. o.chanXAddr <- addr
  195. }
  196. }
  197. func (o *LcDevice) init(xaddr string) error {
  198. //获取服务能力,存入endpoint
  199. err := o.GetCapabilities(xaddr)
  200. if err != nil {
  201. return err
  202. }
  203. //获取profile,存入videoToken
  204. err = o.GetProfiles()
  205. if err != nil {
  206. return err
  207. }
  208. if o.EventActuator != nil {
  209. o.EventActuator.Cancel()
  210. if addr, ok := o.endpoints["events"]; ok {
  211. o.EventActuator = NewEventActuator(o)
  212. go o.EventActuator.EventHandle(addr)
  213. }
  214. }
  215. return nil
  216. }
  217. func (o *LcDevice) Handle() {
  218. defer func() {
  219. recover()
  220. go o.Handle()
  221. }()
  222. timer := time.NewTicker(1 * time.Minute)
  223. NeedRetry := true
  224. //初始化地址
  225. o.XAddr = "http://" + o.IP + ":80/onvif/device_service"
  226. if err := o.init(o.XAddr); err == nil {
  227. NeedRetry = false
  228. }
  229. for {
  230. select {
  231. case <-o.ctx.Done():
  232. o.CleanupProcess()
  233. if o.EventActuator != nil {
  234. o.EventActuator.Cancel()
  235. }
  236. return
  237. case <-timer.C: //每隔1分钟执行一次
  238. if NeedRetry {
  239. if err := o.init(o.XAddr); err == nil {
  240. NeedRetry = false
  241. }
  242. }
  243. o.State.CheckOnline(o)
  244. case addr := <-o.chanXAddr:
  245. if addr != o.XAddr {
  246. o.XAddr = addr
  247. if err := o.init(addr); err == nil {
  248. NeedRetry = false
  249. }
  250. }
  251. default:
  252. if o.EventActuator == nil && len(o.onvifDev.Event) > 0 {
  253. if addr, ok := o.endpoints["events"]; ok {
  254. o.EventActuator = NewEventActuator(o)
  255. go o.EventActuator.EventHandle(addr)
  256. }
  257. }
  258. //从队列钟获取指令执行
  259. if m, ok, _ := o.downQueue.Get(); ok {
  260. if mm, ok := m.(mqtt.Message); ok {
  261. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  262. fn(mm)
  263. }
  264. }
  265. } else {
  266. time.Sleep(50 * time.Millisecond)
  267. }
  268. }
  269. }
  270. }
  271. func FileExist(path string) bool {
  272. _, err := os.Lstat(path)
  273. return !os.IsNotExist(err)
  274. }