package service import ( "encoding/json" "errors" "fmt" "runtime" "runtime/debug" "smart_tunnel_edge/mqtt" "smart_tunnel_edge/util/config" "smart_tunnel_edge/util/logger" "strconv" "strings" "sync" "time" ) func InitMqtt() { MqttService = GetHandler() MqttService.SubscribeTopics() go MqttService.Handler() } var MqttService *MqttHandler type MqttHandler struct { queue *mqtt.MlQueue } var _handlerOnce sync.Once var _handlerSingle *MqttHandler func GetHandler() *MqttHandler { _handlerOnce.Do(func() { _handlerSingle = &MqttHandler{ queue: mqtt.NewQueue(10000), } }) return _handlerSingle } func (o *MqttHandler) SubscribeTopics() { mqtt.GetMQTTMgr().Subscribe("smart_tunnel/#", mqtt.AtLeastOnce, o.HandlerData) } func (o *MqttHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *MqttHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } tunnelId, topic, err := ParseTopic(m.Topic()) if err != nil { logger.Logger.Errorf("parseTopic err") continue } if tunnelId != config.Instance().TunnelId { continue } switch topic { case TopicLampControl: data := m.PayloadString() data1 := data[0:1] data2 := data[1:] radarId, _ := strconv.Atoi(data1) brightNess, _ := strconv.Atoi(data2) SetLampBright(int8(radarId), brightNess) case TopicSwitchControl: data := m.PayloadString() data1 := data[0:1] data2 := data[1:2] data3 := data[2:3] radarId, _ := strconv.Atoi(data1) way, _ := strconv.Atoi(data2) turnOf, _ := strconv.Atoi(data3) SetSwitchRelay(int8(radarId), way, turnOf) case TopicTunnelTactics: // 修改策略 num, _ := strconv.Atoi(m.PayloadString()) updates := map[string]interface{}{ "id": num, } if !updateConfigAndReload(updates) { continue } case TopicTunnelTiming: // 修改时控 var data map[string]interface{} if err := json.Unmarshal([]byte(m.PayloadString()), &data); err != nil { logger.Logger.Errorf("Error unmarshalling JSON: %v", err) continue } updates := map[string]interface{}{ "startTime": data["startTime"], "endTime": data["endTime"], } if !updateConfigAndReload(updates) { continue } } } } func (o *MqttHandler) Publish(topic string, data interface{}) error { return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce) } func (o *MqttHandler) GetTopic(operation string) string { tunnelId := config.Instance().TunnelId return fmt.Sprintf("smart_tunnel/%s/%s", tunnelId, operation) } func ParseTopic(topic string) (sn string, operate string, err error) { strList := strings.Split(topic, "/") sn = strList[1] operate = strList[2] if len(strList) != 3 { return "", "", errors.New("不支持的topic") } return } func updateConfigAndReload(updates map[string]interface{}) bool { if err := config.UpdateYAMLConfig(updates); err != nil { logger.Logger.Errorf("修改yaml失败 err: %v", err) return false } config.ReloadConfig() return true } const ( TopicGatherDataEnv = "gatherDataEnv" TopicGatherDataOpt = "gatherDataOpt" TopicTunnelTactics = "tunnelTactics" TopicTunnelTiming = "tunnelTiming" TopicLampControl = "lampControl" TopicSwitchControl = "switchControl" )