websvr.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "runtime/debug"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/labstack/echo/v4"
  12. "github.com/labstack/echo/v4/middleware"
  13. "github.com/sirupsen/logrus"
  14. "lc/common/mqtt"
  15. "lc/common/protocol"
  16. "lc/common/util"
  17. )
  18. var _onceWebSvr sync.Once
  19. var _singleWebSvr *WebSvr
  20. func GetWebSvr() *WebSvr {
  21. _onceWebSvr.Do(func() {
  22. _singleWebSvr = NewWebSvr(itsConfig.IPAddr)
  23. })
  24. return _singleWebSvr
  25. }
  26. type H map[string]interface{}
  27. type WebSvr struct {
  28. echo *echo.Echo
  29. IPAddr string
  30. queue *util.MlQueue
  31. ctx context.Context
  32. cancel context.CancelFunc
  33. muEnvdata sync.Mutex
  34. Envdata *EnvData //环境数据
  35. muVSpeed sync.Mutex
  36. VSpeed map[string]*protocol.VehicleSpeed //CamID+Direction->速度
  37. mapTopicHandle map[string]func(m mqtt.Message)
  38. }
  39. func NewWebSvr(addr string) *WebSvr {
  40. ctx, cancel := context.WithCancel(context.Background())
  41. obj := WebSvr{
  42. echo: echo.New(),
  43. IPAddr: addr,
  44. queue: util.NewQueue(200),
  45. ctx: ctx,
  46. cancel: cancel,
  47. VSpeed: make(map[string]*protocol.VehicleSpeed),
  48. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  49. }
  50. //obj.echo.Use(middleware.Logger())
  51. obj.echo.Debug = true
  52. obj.echo.HideBanner = true
  53. obj.echo.Use(middleware.Recover())
  54. obj.echo.Use(middleware.CORS())
  55. obj.echo.HTTPErrorHandler = obj.HTTPErrorHandler
  56. obj.echo.Static("/", "public")
  57. obj.echo.GET("/getdata", obj.GetData)
  58. return &obj
  59. }
  60. func (o *WebSvr) MQTTSubscribe() {
  61. o.mapTopicHandle[GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA)] = o.HandleTpModbusData
  62. o.mapTopicHandle[GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED)] = o.HandleTpItsVehiclespeed
  63. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA), mqtt.ExactlyOnce, o.HandleCache, ToEdge)
  64. if len(itsConfig.RemoteSub) > 0 {
  65. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED), mqtt.ExactlyOnce, o.HandleCache, ToCloud)
  66. }
  67. }
  68. func (o *WebSvr) HandleCache(m mqtt.Message) {
  69. o.queue.Put(m)
  70. }
  71. func (o *WebSvr) HandleTpItsVehiclespeed(m mqtt.Message) {
  72. var obj protocol.Pack_VehicleSpeed
  73. if err := obj.DeCode(m.PayloadString()); err != nil {
  74. logrus.Errorf("解码错误:%s", err.Error())
  75. return
  76. }
  77. key := obj.Id
  78. value := &protocol.VehicleSpeed{
  79. Plate: obj.Data.Plate,
  80. Time: obj.Data.Time,
  81. Type: obj.Data.Type,
  82. Speed: obj.Data.Speed,
  83. }
  84. o.muVSpeed.Lock()
  85. o.VSpeed[key] = value
  86. o.muVSpeed.Unlock()
  87. }
  88. func (o *WebSvr) HandleTpModbusData(m mqtt.Message) {
  89. var obj protocol.Pack_UploadData
  90. if err := obj.DeCode(m.PayloadString()); err != nil {
  91. logrus.Errorf("解码错误:%s", err.Error())
  92. return
  93. }
  94. if obj.Data.Tid != 1 {
  95. return
  96. }
  97. //key = 1,噪声;2,PM2.5;3,PM10;4,温度;5,湿度;6,大气压;7,风速;8,风向
  98. var env EnvData
  99. env.Time = util.MlNow()
  100. pm25, err := GetValue(obj.Data.Data, 2)
  101. if err == nil {
  102. env.PM25 = strconv.FormatFloat(pm25, 'f', 0, 64)
  103. }
  104. pm10, err := GetValue(obj.Data.Data, 3)
  105. if err == nil {
  106. env.PM10 = strconv.FormatFloat(pm10, 'f', 0, 64)
  107. }
  108. temp, err := GetValue(obj.Data.Data, 4)
  109. if err == nil {
  110. env.Wendu = strconv.FormatFloat(temp, 'f', 0, 64)
  111. }
  112. sd, err := GetValue(obj.Data.Data, 5)
  113. if err == nil {
  114. env.Shidu = strconv.FormatFloat(sd, 'f', 0, 64)
  115. }
  116. aqi, degree := util.GetAqiAndDegree(pm25, pm10)
  117. env.Aqi = strconv.FormatFloat(aqi, 'f', 0, 64)
  118. env.Zhishu = degree
  119. //风速,风力
  120. direction, err := GetValue(obj.Data.Data, 8)
  121. if err == nil {
  122. env.Fengxiang = util.GetWindDirection(direction).Namezh
  123. }
  124. speed, err := GetValue(obj.Data.Data, 7)
  125. if err == nil {
  126. x, _ := util.GetWindGrade(speed)
  127. env.Fengji = x
  128. }
  129. o.muEnvdata.Lock()
  130. o.Envdata = &env
  131. o.muEnvdata.Unlock()
  132. }
  133. func (o *WebSvr) Handle(args ...interface{}) interface{} {
  134. defer func() {
  135. if err := recover(); err != nil {
  136. logrus.Errorf("WebSvr.StartSvr发生异常:%v", err)
  137. logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack()))
  138. gopool.Add(o.StartSvr, args)
  139. }
  140. }()
  141. exit := false
  142. for {
  143. select {
  144. case <-o.ctx.Done():
  145. logrus.Errorf("WebSvr.HandleData退出,原因:%v", o.ctx.Err())
  146. exit = true
  147. default:
  148. //从队列钟获取指令执行
  149. if m, ok, _ := o.queue.Get(); ok {
  150. if mm, ok := m.(mqtt.Message); ok {
  151. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  152. fn(mm)
  153. }
  154. }
  155. } else {
  156. if exit { //退出前全部恢复时控模式
  157. return 0
  158. }
  159. time.Sleep(300 * time.Millisecond)
  160. }
  161. }
  162. }
  163. }
  164. func (o *WebSvr) Update(data *tagVehicleInfo) {
  165. key := data.CamID
  166. //不用车牌时间,防止抓拍单元时间和网关时间相差太大
  167. value := &protocol.VehicleSpeed{Plate: data.VehiclePlate, Time: protocol.BJTime(util.MlNow()), Type: data.VehicleType, Speed: data.VehicleSpeed}
  168. o.muVSpeed.Lock()
  169. o.VSpeed[key] = value
  170. o.muVSpeed.Unlock()
  171. }
  172. func (o *WebSvr) StartSvr(args ...interface{}) interface{} {
  173. defer func() {
  174. if err := recover(); err != nil {
  175. logrus.Errorf("WebSvr.StartSvr发生异常:%v", err)
  176. logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack()))
  177. gopool.Add(o.StartSvr, args)
  178. }
  179. }()
  180. return o.echo.Start(o.IPAddr)
  181. }
  182. func (o *WebSvr) HTTPErrorHandler(err error, c echo.Context) {
  183. c.JSON(http.StatusInternalServerError, nil)
  184. }
  185. func (o *WebSvr) GetData(c echo.Context) error {
  186. obj := GetDataPool().GetData()
  187. obj.Reset()
  188. defer GetDataPool().Release(obj)
  189. key := c.QueryParam("code")
  190. o.muVSpeed.Lock()
  191. if v, ok := o.VSpeed[key]; ok {
  192. diffSec := util.MlNow().Sub(time.Time(v.Time)).Seconds()
  193. if diffSec > float64(itsConfig.Duration) {
  194. obj.Chesu = 0
  195. delete(o.VSpeed, key)
  196. } else {
  197. obj.Chesu = v.Speed
  198. }
  199. }
  200. o.muVSpeed.Unlock()
  201. o.muEnvdata.Lock()
  202. if o.Envdata != nil {
  203. //if util.MlNow().Sub(o.Envdata.Time).Seconds() < 24*3600 {
  204. obj.Fengji = o.Envdata.Fengji
  205. obj.Tianqi = o.Envdata.Tianqi
  206. obj.Aqi = o.Envdata.Aqi
  207. obj.PM25 = o.Envdata.PM25
  208. obj.PM10 = o.Envdata.PM10
  209. obj.Wendu = o.Envdata.Wendu
  210. obj.Zhishu = o.Envdata.Zhishu
  211. obj.Shidu = o.Envdata.Shidu
  212. obj.Fengxiang = o.Envdata.Fengxiang
  213. obj.Fengji = o.Envdata.Fengji
  214. //} else {
  215. // o.Envdata = nil
  216. //}
  217. }
  218. o.muEnvdata.Unlock()
  219. obj.Chesufazhi = int(itsConfig.SuggestSpeed)
  220. return c.JSON(http.StatusOK, H{"data": 0})
  221. }
  222. func GetValue(m map[uint16]float64, key uint16) (float64, error) {
  223. if v, ok := m[key]; ok {
  224. return v, nil
  225. }
  226. return 0.0, errors.New("未找到")
  227. }