| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package main
- import (
- "context"
- "errors"
- "fmt"
- "net/http"
- "runtime/debug"
- "strconv"
- "sync"
- "time"
- "github.com/labstack/echo/v4"
- "github.com/labstack/echo/v4/middleware"
- "github.com/sirupsen/logrus"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _onceWebSvr sync.Once
- var _singleWebSvr *WebSvr
- func GetWebSvr() *WebSvr {
- _onceWebSvr.Do(func() {
- _singleWebSvr = NewWebSvr(itsConfig.IPAddr)
- })
- return _singleWebSvr
- }
- type H map[string]interface{}
- type WebSvr struct {
- echo *echo.Echo
- IPAddr string
- queue *util.MlQueue
- ctx context.Context
- cancel context.CancelFunc
- muEnvdata sync.Mutex
- Envdata *EnvData //环境数据
- muVSpeed sync.Mutex
- VSpeed map[string]*protocol.VehicleSpeed //CamID+Direction->速度
- mapTopicHandle map[string]func(m mqtt.Message)
- }
- func NewWebSvr(addr string) *WebSvr {
- ctx, cancel := context.WithCancel(context.Background())
- obj := WebSvr{
- echo: echo.New(),
- IPAddr: addr,
- queue: util.NewQueue(200),
- ctx: ctx,
- cancel: cancel,
- VSpeed: make(map[string]*protocol.VehicleSpeed),
- mapTopicHandle: make(map[string]func(m mqtt.Message)),
- }
- //obj.echo.Use(middleware.Logger())
- obj.echo.Debug = true
- obj.echo.HideBanner = true
- obj.echo.Use(middleware.Recover())
- obj.echo.Use(middleware.CORS())
- obj.echo.HTTPErrorHandler = obj.HTTPErrorHandler
- obj.echo.Static("/", "public")
- obj.echo.GET("/getdata", obj.GetData)
- return &obj
- }
- func (o *WebSvr) MQTTSubscribe() {
- o.mapTopicHandle[GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA)] = o.HandleTpModbusData
- o.mapTopicHandle[GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED)] = o.HandleTpItsVehiclespeed
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA), mqtt.ExactlyOnce, o.HandleCache, ToCloud)
- if len(itsConfig.RemoteSub) > 0 {
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED), mqtt.ExactlyOnce, o.HandleCache, ToCloud)
- }
- }
- func (o *WebSvr) HandleCache(m mqtt.Message) {
- o.queue.Put(m)
- }
- func (o *WebSvr) HandleTpItsVehiclespeed(m mqtt.Message) {
- var obj protocol.Pack_VehicleSpeed
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("解码错误:%s", err.Error())
- return
- }
- key := obj.Id
- value := &protocol.VehicleSpeed{
- Plate: obj.Data.Plate,
- Time: protocol.BJTime(util.MlNow()),
- Type: obj.Data.Type,
- Speed: obj.Data.Speed,
- }
- o.muVSpeed.Lock()
- o.VSpeed[key] = value
- o.muVSpeed.Unlock()
- }
- func (o *WebSvr) HandleTpModbusData(m mqtt.Message) {
- var obj protocol.Pack_UploadData
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("解码错误:%s", err.Error())
- return
- }
- if obj.Data.Tid != 1 {
- return
- }
- //key = 1,噪声;2,PM2.5;3,PM10;4,温度;5,湿度;6,大气压;7,风速;8,风向
- var env EnvData
- env.Time = util.MlNow()
- pm25, err := GetValue(obj.Data.Data, 2)
- if err == nil {
- env.PM25 = strconv.FormatFloat(pm25, 'f', 0, 64)
- }
- pm10, err := GetValue(obj.Data.Data, 3)
- if err == nil {
- env.PM10 = strconv.FormatFloat(pm10, 'f', 0, 64)
- }
- temp, err := GetValue(obj.Data.Data, 4)
- if err == nil {
- env.Wendu = strconv.FormatFloat(temp, 'f', 0, 64)
- }
- sd, err := GetValue(obj.Data.Data, 5)
- if err == nil {
- env.Shidu = strconv.FormatFloat(sd, 'f', 0, 64)
- }
- aqi, degree := util.GetAqiAndDegree(pm25, pm10)
- env.Aqi = strconv.FormatFloat(aqi, 'f', 0, 64)
- env.Zhishu = degree
- //风速,风力
- direction, err := GetValue(obj.Data.Data, 8)
- if err == nil {
- env.Fengxiang = util.GetWindDirection(direction).Namezh
- }
- speed, err := GetValue(obj.Data.Data, 7)
- if err == nil {
- x, _ := util.GetWindGrade(speed)
- env.Fengji = x
- }
- o.muEnvdata.Lock()
- o.Envdata = &env
- o.muEnvdata.Unlock()
- }
- func (o *WebSvr) Handle(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("WebSvr.StartSvr发生异常:%v", err)
- logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack()))
- gopool.Add(o.StartSvr, args)
- }
- }()
- exit := false
- for {
- select {
- case <-o.ctx.Done():
- logrus.Errorf("WebSvr.HandleData退出,原因:%v", o.ctx.Err())
- exit = true
- default:
- //从队列钟获取指令执行
- if m, ok, _ := o.queue.Get(); ok {
- if mm, ok := m.(mqtt.Message); ok {
- if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
- fn(mm)
- }
- }
- } else {
- if exit { //退出前全部恢复时控模式
- return 0
- }
- time.Sleep(300 * time.Millisecond)
- }
- }
- }
- }
- func (o *WebSvr) Update(data *tagVehicleInfo) {
- key := data.CamID
- value := &protocol.VehicleSpeed{Plate: data.VehiclePlate, Time: protocol.BJTime(data.PassTime), Type: data.VehicleType, Speed: data.VehicleSpeed}
- o.muVSpeed.Lock()
- o.VSpeed[key] = value
- o.muVSpeed.Unlock()
- }
- func (o *WebSvr) StartSvr(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("WebSvr.StartSvr发生异常:%v", err)
- logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack()))
- gopool.Add(o.StartSvr, args)
- }
- }()
- return o.echo.Start(o.IPAddr)
- }
- func (o *WebSvr) HTTPErrorHandler(err error, c echo.Context) {
- c.JSON(http.StatusInternalServerError, nil)
- }
- func (o *WebSvr) GetData(c echo.Context) error {
- obj := GetDataPool().GetData()
- obj.Reset()
- defer GetDataPool().Release(obj)
- key := c.QueryParam("code")
- o.muVSpeed.Lock()
- if v, ok := o.VSpeed[key]; ok {
- diffSec := util.MlNow().Sub(time.Time(v.Time)).Seconds()
- if diffSec > float64(itsConfig.Duration) {
- obj.Chesu = 0
- delete(o.VSpeed, key)
- } else {
- obj.Chesu = v.Speed
- }
- }
- o.muVSpeed.Unlock()
- o.muEnvdata.Lock()
- if o.Envdata != nil {
- //if util.MlNow().Sub(o.Envdata.Time).Seconds() < 24*3600 {
- obj.Fengji = o.Envdata.Fengji
- obj.Tianqi = o.Envdata.Tianqi
- obj.Aqi = o.Envdata.Aqi
- obj.PM25 = o.Envdata.PM25
- obj.PM10 = o.Envdata.PM10
- obj.Wendu = o.Envdata.Wendu
- obj.Zhishu = o.Envdata.Zhishu
- obj.Shidu = o.Envdata.Shidu
- obj.Fengxiang = o.Envdata.Fengxiang
- obj.Fengji = o.Envdata.Fengji
- //} else {
- // o.Envdata = nil
- //}
- }
- o.muEnvdata.Unlock()
- obj.Chesufazhi = int(itsConfig.SuggestSpeed)
- return c.JSON(http.StatusOK, H{"data": obj})
- }
- func GetValue(m map[uint16]float64, key uint16) (float64, error) {
- if v, ok := m[key]; ok {
- return v, nil
- }
- return 0.0, errors.New("未找到")
- }
|