123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package mqtt
- import (
- "errors"
- "fmt"
- jsoniter "github.com/json-iterator/go"
- "go.uber.org/zap"
- "runtime"
- "runtime/debug"
- Dev "server/dao/devices"
- "server/utils/logger"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- func InitMqtt() {
- MqttService = GetHandler()
- MqttService.SubscribeTopics()
- go MqttService.Handler()
- }
- var MqttService *MqttHandler
- var jsoni = jsoniter.ConfigFastest
- type MqttHandler struct {
- queue *MlQueue
- }
- var _handlerOnce sync.Once
- var _handlerSingle *MqttHandler
- func GetHandler() *MqttHandler {
- _handlerOnce.Do(func() {
- _handlerSingle = &MqttHandler{
- queue: NewQueue(10000),
- }
- })
- return _handlerSingle
- }
- func (o *MqttHandler) SubscribeTopics() {
- GetMQTTMgr().Subscribe("smart_intersection/#", AtLeastOnce, o.HandlerData)
- }
- func (o *MqttHandler) HandlerData(m Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *MqttHandler) Handler() interface{} {
- defer func() {
- if err := recover(); err != nil {
- go GetHandler().Handler()
- logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*Message)
- if !ok {
- continue
- }
- deviceCode, topic, err := parseTopic(m.Topic())
- if err != nil {
- logger.Logger.Errorf("parseTopic err")
- continue
- }
- switch topic {
- case TopicDeviceGateway:
- time, _ := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", m.PayloadString())
- err := Dev.UpdateGatewayRecentOnline(deviceCode, time)
- if err != nil {
- logger.Logger.Error("UpdateGatewayRecentOnline err", zap.Error(err))
- }
- case TopicDeviceCamera:
- //根据编码修改摄像头状态
- status, _ := strconv.Atoi(m.PayloadString())
- err := Dev.UpdateCameraStatus(deviceCode, status)
- if err != nil {
- logger.Logger.Error("UpdateCameraStatus err", zap.Error(err))
- }
- //case TopicDeviceScreens:
- //var stat request.DeviceStatus
- //jsoni.Unmarshal([]byte(m.PayloadString()), &stat)
- //err = Dev.UpdateScreensStatus(deviceCode, stat)
- //if err != nil {
- // logger.Logger.Error("UpdateScreensStatus err", zap.Error(err))
- //}
- default:
- fmt.Println("我是主题:::", topic)
- }
- }
- }
- func parseTopic(topic string) (string, string, error) {
- strList := strings.Split(topic, "/")
- if len(strList) < 4 {
- return "", "", errors.New("不支持的topic")
- }
- topic = strings.Join(strList[2:], "/")
- return strList[1], topic, nil
- }
- const (
- TopicDeviceGateway = "device/gateway"
- TopicDeviceCamera = "device/camera"
- TopicDeviceScreens = "device/screens"
- )
|