package devices import ( "errors" "fmt" "regexp" "runtime" "runtime/debug" "server/dao" "server/global" "server/utils/mqtt" "server/utils/protocol" "strconv" "strings" "sync" "time" ) func InitMqtt() { MqttService = GetHandler() MqttService.SubscribeTopics() go MqttService.Handler() } var MqttService *MqttHandler var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout") var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`) var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected") var DeviceOnlineTimes = make(map[string]*time.Time) type MqttHandler struct { queue *mqtt.MlQueue } var _handlerOnce sync.Once var _handlerSingle *MqttHandler func GetHandler() *MqttHandler { _handlerOnce.Do(func() { _handlerSingle = &MqttHandler{ queue: mqtt.NewQueue(10000), } }) return _handlerSingle } func (o *MqttHandler) SubscribeTopics() { mqtt.GetMQTTMgr().Subscribe("smart_intersectionV2.0/#", mqtt.AtLeastOnce, o.HandlerData) } func (o *MqttHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { global.GVA_LOG.Error(fmt.Sprintf("HandlerData:查询队列失败,队列消息数量:%d", cnt)) runtime.Gosched() } } } func (o *MqttHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() global.GVA_LOG.Error(fmt.Sprintf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { global.GVA_LOG.Error(fmt.Sprintf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)) } m, ok := msg.(*mqtt.Message) if !ok { continue } sn, topic, err := parseTopic(m.Topic()) if err != nil { global.GVA_LOG.Error("parseTopic err") continue } global.GVA_LOG.Info(fmt.Sprintf("mqtt Handler sn :%s", sn)) global.GVA_LOG.Info(fmt.Sprintf("mqtt Handler topic :%s", topic)) switch topic { case protocol.TopicHighSpeed: //存储超速事件 event := dao.Event{Sn: sn, Type: "超速", Time: m.PayloadString()} err := event.AddEvent() if err != nil { global.GVA_LOG.Error(fmt.Sprintf("添加事件失败:%s", err.Error())) } case protocol.TopicLowSpeed: //存储低速时间 event := dao.Event{Sn: sn, Type: "低速", Time: m.PayloadString()} err := event.AddEvent() if err != nil { global.GVA_LOG.Error(fmt.Sprintf("添加事件失败:%s", err.Error())) } case protocol.TopicChanStatus: //修改屏幕状态 status, _ := strconv.Atoi(m.PayloadString()) err := dao.UpdateScreensStatusBySn(sn, status) if err != nil { global.GVA_LOG.Error(fmt.Sprintf("修改屏幕状态失败:%s", err.Error())) } case protocol.TopicReportTime: parts := strings.Split(m.PayloadString(), " m=") cleanTimeStr := parts[0] // 时间格式 layout := "2006-01-02 15:04:05.999999999 +0000 UTC" // 解析时间 parsedTime, _ := time.Parse(layout, cleanTimeStr) //fmt.Println("转换时间:", parsedTime) DeviceOnlineTimes[sn] = &parsedTime } } } func (o *MqttHandler) Publish(topic string, data interface{}) error { return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce) } func (o *MqttHandler) GetTopic(deviceSn, protocol string) string { return fmt.Sprintf("smart_intersectionV2.0/%s/%s", deviceSn, protocol) } // parseTopic 获取设备SN, topic // "mini/*****/switch_control/ack" func parseTopic(topic string) (string, string, error) { strList := strings.Split(topic, "/") if len(strList) < 3 { return "", "", errors.New("不支持的topic") } topic = strings.Join(strList[2:], "/") return strList[1], topic, nil } func Sending(sn, json string) error { err := MqttService.Publish(MqttService.GetTopic(sn, protocol.TopicSetControl), []byte(json)) if err != nil { return fmt.Errorf("error updating: %v", err) } return nil }