package main import ( "context" "errors" "fmt" "net/http" "runtime/debug" "strconv" "sync" "time" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _onceWebSvr sync.Once var _singleWebSvr *WebSvr func GetWebSvr() *WebSvr { _onceWebSvr.Do(func() { _singleWebSvr = NewWebSvr(itsConfig.IPAddr) }) return _singleWebSvr } type H map[string]interface{} type WebSvr struct { echo *echo.Echo IPAddr string queue *util.MlQueue ctx context.Context cancel context.CancelFunc muEnvdata sync.Mutex Envdata *EnvData //环境数据 muVSpeed sync.Mutex VSpeed map[string]*protocol.VehicleSpeed //CamID+Direction->速度 mapTopicHandle map[string]func(m mqtt.Message) } func NewWebSvr(addr string) *WebSvr { ctx, cancel := context.WithCancel(context.Background()) obj := WebSvr{ echo: echo.New(), IPAddr: addr, queue: util.NewQueue(200), ctx: ctx, cancel: cancel, VSpeed: make(map[string]*protocol.VehicleSpeed), mapTopicHandle: make(map[string]func(m mqtt.Message)), } //obj.echo.Use(middleware.Logger()) obj.echo.Debug = true obj.echo.HideBanner = true obj.echo.Use(middleware.Recover()) obj.echo.Use(middleware.CORS()) obj.echo.HTTPErrorHandler = obj.HTTPErrorHandler obj.echo.Static("/", "public") obj.echo.GET("/getdata", obj.GetData) return &obj } func (o *WebSvr) MQTTSubscribe() { o.mapTopicHandle[GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA)] = o.HandleTpModbusData o.mapTopicHandle[GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED)] = o.HandleTpItsVehiclespeed GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ENVIRONMENT, itsConfig.EnvID, protocol.TP_MODBUS_DATA), mqtt.ExactlyOnce, o.HandleCache, ToCloud) if len(itsConfig.RemoteSub) > 0 { GetMQTTMgr().Subscribe(GetTopic(protocol.DT_ITS, itsConfig.RemoteSub, protocol.TP_ITS_VEHICLESPEED), mqtt.ExactlyOnce, o.HandleCache, ToCloud) } } func (o *WebSvr) HandleCache(m mqtt.Message) { o.queue.Put(m) } func (o *WebSvr) HandleTpItsVehiclespeed(m mqtt.Message) { var obj protocol.Pack_VehicleSpeed if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("解码错误:%s", err.Error()) return } key := obj.Id value := &protocol.VehicleSpeed{ Plate: obj.Data.Plate, Time: protocol.BJTime(util.MlNow()), Type: obj.Data.Type, Speed: obj.Data.Speed, } o.muVSpeed.Lock() o.VSpeed[key] = value o.muVSpeed.Unlock() } func (o *WebSvr) HandleTpModbusData(m mqtt.Message) { var obj protocol.Pack_UploadData if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("解码错误:%s", err.Error()) return } if obj.Data.Tid != 1 { return } //key = 1,噪声;2,PM2.5;3,PM10;4,温度;5,湿度;6,大气压;7,风速;8,风向 var env EnvData env.Time = util.MlNow() pm25, err := GetValue(obj.Data.Data, 2) if err == nil { env.PM25 = strconv.FormatFloat(pm25, 'f', 0, 64) } pm10, err := GetValue(obj.Data.Data, 3) if err == nil { env.PM10 = strconv.FormatFloat(pm10, 'f', 0, 64) } temp, err := GetValue(obj.Data.Data, 4) if err == nil { env.Wendu = strconv.FormatFloat(temp, 'f', 0, 64) } sd, err := GetValue(obj.Data.Data, 5) if err == nil { env.Shidu = strconv.FormatFloat(sd, 'f', 0, 64) } aqi, degree := util.GetAqiAndDegree(pm25, pm10) env.Aqi = strconv.FormatFloat(aqi, 'f', 0, 64) env.Zhishu = degree //风速,风力 direction, err := GetValue(obj.Data.Data, 8) if err == nil { env.Fengxiang = util.GetWindDirection(direction).Namezh } speed, err := GetValue(obj.Data.Data, 7) if err == nil { x, _ := util.GetWindGrade(speed) env.Fengji = x } o.muEnvdata.Lock() o.Envdata = &env o.muEnvdata.Unlock() } func (o *WebSvr) Handle(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { logrus.Errorf("WebSvr.StartSvr发生异常:%v", err) logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack())) gopool.Add(o.StartSvr, args) } }() exit := false for { select { case <-o.ctx.Done(): logrus.Errorf("WebSvr.HandleData退出,原因:%v", o.ctx.Err()) exit = true default: //从队列钟获取指令执行 if m, ok, _ := o.queue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } } } else { if exit { //退出前全部恢复时控模式 return 0 } time.Sleep(300 * time.Millisecond) } } } } func (o *WebSvr) Update(data *tagVehicleInfo) { key := data.CamID value := &protocol.VehicleSpeed{Plate: data.VehiclePlate, Time: protocol.BJTime(data.PassTime), Type: data.VehicleType, Speed: data.VehicleSpeed} o.muVSpeed.Lock() o.VSpeed[key] = value o.muVSpeed.Unlock() } func (o *WebSvr) StartSvr(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { logrus.Errorf("WebSvr.StartSvr发生异常:%v", err) logrus.Errorf("WebSvr.StartSvr发生异常,堆栈信息:%s", string(debug.Stack())) gopool.Add(o.StartSvr, args) } }() return o.echo.Start(o.IPAddr) } func (o *WebSvr) HTTPErrorHandler(err error, c echo.Context) { c.JSON(http.StatusInternalServerError, nil) } func (o *WebSvr) GetData(c echo.Context) error { obj := GetDataPool().GetData() obj.Reset() defer GetDataPool().Release(obj) key := c.QueryParam("code") o.muVSpeed.Lock() if v, ok := o.VSpeed[key]; ok { diffSec := util.MlNow().Sub(time.Time(v.Time)).Seconds() if diffSec > float64(itsConfig.Duration) { obj.Chesu = 0 delete(o.VSpeed, key) } else { obj.Chesu = v.Speed } } o.muVSpeed.Unlock() o.muEnvdata.Lock() if o.Envdata != nil { //if util.MlNow().Sub(o.Envdata.Time).Seconds() < 24*3600 { obj.Fengji = o.Envdata.Fengji obj.Tianqi = o.Envdata.Tianqi obj.Aqi = o.Envdata.Aqi obj.PM25 = o.Envdata.PM25 obj.PM10 = o.Envdata.PM10 obj.Wendu = o.Envdata.Wendu obj.Zhishu = o.Envdata.Zhishu obj.Shidu = o.Envdata.Shidu obj.Fengxiang = o.Envdata.Fengxiang obj.Fengji = o.Envdata.Fengji //} else { // o.Envdata = nil //} } o.muEnvdata.Unlock() obj.Chesufazhi = int(itsConfig.SuggestSpeed) return c.JSON(http.StatusOK, H{"data": obj}) } func GetValue(m map[uint16]float64, key uint16) (float64, error) { if v, ok := m[key]; ok { return v, nil } return 0.0, errors.New("未找到") }