package mqtt import ( "errors" "fmt" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" "runtime" "runtime/debug" Dev "server/dao/devices" "server/utils/logger" "strconv" "strings" "sync" "time" ) func InitMqtt() { MqttService = GetHandler() MqttService.SubscribeTopics() go MqttService.Handler() } var MqttService *MqttHandler var jsoni = jsoniter.ConfigFastest type MqttHandler struct { queue *MlQueue } var _handlerOnce sync.Once var _handlerSingle *MqttHandler func GetHandler() *MqttHandler { _handlerOnce.Do(func() { _handlerSingle = &MqttHandler{ queue: NewQueue(10000), } }) return _handlerSingle } func (o *MqttHandler) SubscribeTopics() { GetMQTTMgr().Subscribe("smart_intersection/#", AtLeastOnce, o.HandlerData) } func (o *MqttHandler) HandlerData(m Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *MqttHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*Message) if !ok { continue } deviceCode, topic, err := parseTopic(m.Topic()) if err != nil { logger.Logger.Errorf("parseTopic err") continue } switch topic { case TopicDeviceGateway: time, _ := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", m.PayloadString()) err := Dev.UpdateGatewayRecentOnline(deviceCode, time) if err != nil { logger.Logger.Error("UpdateGatewayRecentOnline err", zap.Error(err)) } case TopicDeviceCamera: //根据编码修改摄像头状态 status, _ := strconv.Atoi(m.PayloadString()) err := Dev.UpdateCameraStatus(deviceCode, status) if err != nil { logger.Logger.Error("UpdateCameraStatus err", zap.Error(err)) } //case TopicDeviceScreens: //var stat request.DeviceStatus //jsoni.Unmarshal([]byte(m.PayloadString()), &stat) //err = Dev.UpdateScreensStatus(deviceCode, stat) //if err != nil { // logger.Logger.Error("UpdateScreensStatus err", zap.Error(err)) //} default: fmt.Println("我是主题:::", topic) } } } func parseTopic(topic string) (string, string, error) { strList := strings.Split(topic, "/") if len(strList) < 4 { return "", "", errors.New("不支持的topic") } topic = strings.Join(strList[2:], "/") return strList[1], topic, nil } const ( TopicDeviceGateway = "device/gateway" TopicDeviceCamera = "device/camera" TopicDeviceScreens = "device/screens" )