123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package service
- import (
- "encoding/json"
- "errors"
- "fmt"
- "runtime"
- "runtime/debug"
- "smart_tunnel_edge/mqtt"
- "smart_tunnel_edge/util/config"
- "smart_tunnel_edge/util/logger"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- func InitMqtt() {
- MqttService = GetHandler()
- MqttService.SubscribeTopics()
- go MqttService.Handler()
- }
- var MqttService *MqttHandler
- type MqttHandler struct {
- queue *mqtt.MlQueue
- }
- var _handlerOnce sync.Once
- var _handlerSingle *MqttHandler
- func GetHandler() *MqttHandler {
- _handlerOnce.Do(func() {
- _handlerSingle = &MqttHandler{
- queue: mqtt.NewQueue(10000),
- }
- })
- return _handlerSingle
- }
- func (o *MqttHandler) SubscribeTopics() {
- mqtt.GetMQTTMgr().Subscribe("smart_tunnel/#", mqtt.AtLeastOnce, o.HandlerData)
- }
- func (o *MqttHandler) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *MqttHandler) Handler() interface{} {
- defer func() {
- if err := recover(); err != nil {
- go GetHandler().Handler()
- logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- tunnelId, topic, err := ParseTopic(m.Topic())
- if err != nil {
- logger.Logger.Errorf("parseTopic err")
- continue
- }
- if tunnelId != config.Instance().TunnelId {
- continue
- }
- switch topic {
- case TopicLampControl:
- data := m.PayloadString()
- data1 := data[0:1]
- data2 := data[1:]
- radarId, _ := strconv.Atoi(data1)
- brightNess, _ := strconv.Atoi(data2)
- SetLampBright(int8(radarId), brightNess)
- case TopicSwitchControl:
- data := m.PayloadString()
- data1 := data[0:1]
- data2 := data[1:2]
- data3 := data[2:3]
- radarId, _ := strconv.Atoi(data1)
- way, _ := strconv.Atoi(data2)
- turnOf, _ := strconv.Atoi(data3)
- SetSwitchRelay(int8(radarId), way, turnOf)
- case TopicTunnelTactics:
- // 修改策略
- num, _ := strconv.Atoi(m.PayloadString())
- updates := map[string]interface{}{
- "id": num,
- }
- if !updateConfigAndReload(updates) {
- continue
- }
- case TopicTunnelTiming:
- // 修改时控
- var data map[string]interface{}
- if err := json.Unmarshal([]byte(m.PayloadString()), &data); err != nil {
- logger.Logger.Errorf("Error unmarshalling JSON: %v", err)
- continue
- }
- updates := map[string]interface{}{
- "startTime": data["startTime"],
- "endTime": data["endTime"],
- }
- if !updateConfigAndReload(updates) {
- continue
- }
- }
- }
- }
- func (o *MqttHandler) Publish(topic string, data interface{}) error {
- return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
- }
- func (o *MqttHandler) GetTopic(operation string) string {
- tunnelId := config.Instance().TunnelId
- return fmt.Sprintf("smart_tunnel/%s/%s", tunnelId, operation)
- }
- func ParseTopic(topic string) (sn string, operate string, err error) {
- strList := strings.Split(topic, "/")
- sn = strList[1]
- operate = strList[2]
- if len(strList) != 3 {
- return "", "", errors.New("不支持的topic")
- }
- return
- }
- func updateConfigAndReload(updates map[string]interface{}) bool {
- if err := config.UpdateYAMLConfig(updates); err != nil {
- logger.Logger.Errorf("修改yaml失败 err: %v", err)
- return false
- }
- config.ReloadConfig()
- return true
- }
- const (
- TopicGatherDataEnv = "gatherDataEnv"
- TopicGatherDataOpt = "gatherDataOpt"
- TopicTunnelTactics = "tunnelTactics"
- TopicTunnelTiming = "tunnelTiming"
- TopicLampControl = "lampControl"
- TopicSwitchControl = "switchControl"
- )
|