terry 7 hónapja
szülő
commit
9534d42cc9
23 módosított fájl, 898 hozzáadás és 708 törlés
  1. 1 1
      build.bat
  2. 10 22
      config.yaml
  3. 4 1
      go.mod
  4. 8 2
      go.sum
  5. 2 6
      lc/IDevice.go
  6. BIN
      lc/bitmap/T000.bcm
  7. BIN
      lc/bitmap/huang.bcm
  8. 2 4
      lc/camera_event.go
  9. 0 7
      lc/model/radar.go
  10. 0 30
      lc/model/speaker.go
  11. 125 0
      lc/mqttclient.go
  12. 124 0
      lc/mqttmgr.go
  13. 0 80
      lc/radar_event.go
  14. 5 387
      lc/screen.go
  15. 10 74
      lc/server.go
  16. 0 84
      lc/speaker.go
  17. 3 0
      main.go
  18. 14 10
      util/config.go
  19. 164 0
      util/mqtt/mqtt.go
  20. 46 0
      util/mqtt/publish.go
  21. 125 0
      util/mqtt/router.go
  22. 99 0
      util/mqtt/subscribe.go
  23. 156 0
      util/queue.go

+ 1 - 1
build.bat

@@ -2,4 +2,4 @@ set GOARCH=arm
 set GOARM=6
 set GOOS=linux
 set CGO_ENABLED=0
-go build -o smartX_radar ./
+go build -o smartX_demo ./

+ 10 - 22
config.yaml

@@ -1,31 +1,19 @@
 service:
-  support_radar: false
   support_camera: true
-  support_speaker: true
-  support_led: false
+  support_led: true
 hikServer:
-  addr: ":8850"
+  addr: ":8860"
   path: "/event"
 cameras:
   - name: "主路1"
     ip: "192.168.1.64"
     branch: 1
-radars:
-  - port: "/dev/ttymxc4"
-    name: "对向"
-    branch: 1
-  - port: "/dev/ttymxc6"
-    name: "同向"
-    branch: 1
-#screens:
-#  - name: "主路1输出设备"
-#    ip: "192.168.1.200"
-#    port: "5005"
-#    branch: 0
-speakers:
-  - name: "主路1 IP音柱"
-    ip: "192.168.1.160"
-    speed: 50
-    volume: 80
+screens:
+  - name: "主路1输出设备"
+    ip: "192.168.1.200"
     branch: 0
-    audio: "支路来车"
+mqtt:
+  server: "tcp://127.0.0.1:1883"
+  user: "demo"
+  password: "demo"
+  Timeout: 3000

+ 4 - 1
go.mod

@@ -4,7 +4,8 @@ go 1.19
 
 require (
 	github.com/chzyer/readline v1.5.1
-	github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4
+	github.com/eclipse/paho.mqtt.golang v1.4.3
+	github.com/google/uuid v1.6.0
 	github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
 	github.com/sirupsen/logrus v1.9.3
 	golang.org/x/text v0.14.0
@@ -13,6 +14,7 @@ require (
 
 require (
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
+	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
 	github.com/jonboulle/clockwork v0.4.0 // indirect
 	github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
@@ -20,5 +22,6 @@ require (
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/tebeka/strftime v0.1.5 // indirect
 	golang.org/x/net v0.19.0 // indirect
+	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.15.0 // indirect
 )

+ 8 - 2
go.sum

@@ -7,10 +7,14 @@ github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
+github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
-github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 h1:G2ztCwXov8mRvP0ZfjE6nAlaCX2XbykaeHdbT6KwDz0=
-github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
+github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
 github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
 github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
@@ -34,6 +38,8 @@ github.com/tebeka/strftime v0.1.5 h1:1NQKN1NiQgkqd/2moD6ySP/5CoZQsKa1d3ZhJ44Jpmg
 github.com/tebeka/strftime v0.1.5/go.mod h1:29/OidkoWHdEKZqzyDLUyC+LmgDgdHo4WAFCDT7D/Ig=
 golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
 golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=

+ 2 - 6
lc/IDevice.go

@@ -45,18 +45,14 @@ type OutputDeviceInfo struct {
 }
 
 type IntersectionDevice struct {
-	Info    OutputDeviceInfo
-	Screen  Screener
-	Speaker Loudspeaker
+	Info   OutputDeviceInfo
+	Screen Screener
 }
 
 func (id *IntersectionDevice) Call() {
 	if id.Screen != nil {
 		id.Screen.Display(1)
 	}
-	if id.Speaker != nil {
-		id.Speaker.Speak("支路来车")
-	}
 }
 func (id *IntersectionDevice) Rollback() {
 	if id.Screen != nil {

BIN
lc/bitmap/T000.bcm


BIN
lc/bitmap/huang.bcm


+ 2 - 4
lc/camera_event.go

@@ -32,10 +32,7 @@ func (s *CameraServer) Start() {
 
 func (s *CameraServer) RegisterCallback(branch byte, notifier Notifier) {
 	for _, camera := range s.Cameras {
-		//关联主路led屏和支路摄像头;关联支路led屏和主路摄像头
-		if branch == 0 && camera.Branch == 1 || branch == 1 && camera.Branch == 0 {
-			s.Notifiers[camera.IP] = notifier
-		}
+		s.Notifiers[camera.IP] = notifier
 	}
 }
 
@@ -68,6 +65,7 @@ func (s *CameraServer) Handler(w http.ResponseWriter, r *http.Request) {
 		if err != nil {
 			return
 		}
+		logrus.Errorf("EventNotificationAlert = %+v", event)
 		//发送事件通知
 		s.Callback(event.IpAddress)
 	} else if strings.Contains(contentType, "multipart/form-data") {

+ 0 - 7
lc/model/radar.go

@@ -1,7 +0,0 @@
-package model
-
-type RadarInfo struct {
-	Port   string `yaml:"port"` //对应哪个485口
-	Name   string `yaml:"name"`
-	Branch byte   `yaml:"branch"`
-}

+ 0 - 30
lc/model/speaker.go

@@ -1,30 +0,0 @@
-package model
-
-type SpeakerInfo struct {
-	Name   string `yaml:"name"`
-	Ip     string `yaml:"ip"`
-	Branch byte   `yaml:"branch"`
-	Speed  byte   `yaml:"speed"`
-	Volume byte   `yaml:"volume"`
-	Audio  string `yaml:"audio"`
-}
-
-type PlayReq struct {
-	Url    string   `json:"url"`    //要播放的网络音频 http/https/rtsp等⾳频地址
-	Text   string   `json:"text"`   //要播放的文本内容
-	Vcn    string   `json:"vcn"`    //发音人 xiaofeng xiaoyan
-	Speed  byte     `json:"speed"`  //发音速度 0-100 默认50
-	Volume byte     `json:"volume"` //音量 0-100 默认50
-	Rdn    string   `json:"rdn"`    //数字发音 0 数值优先, 1 完全数值, 2 完全字符串, 3 字符串优先 默认2
-	Rcn    string   `json:"rcn"`    //数字1 的中文发音 0:表示发⾳为yao 1:表示发音为yi 默认0
-	Reg    byte     `json:"reg"`    //英文发音 0:⾃动识别英语单词; 1:逐个字母发音
-	Sync   bool     `json:"sync"`   //true: 同步模式,语音播放完毕后再响应; false:即时响应(不等待播放完成)
-	Queue  bool     `json:"queue"`  //true: 队列模式,如果当前有语⾳在播放,则加到队列排队播放
-	Loop   LoopInfo `json:"loop"`
-}
-
-type LoopInfo struct {
-	Duration int `json:"duration"` // 循环(重复)播放时⻓(秒)选填
-	Times    int `json:"times"`    // 循环(重复)播放次数(次)选填
-	Gap      int `json:"gap"`      // 循环(重复)播放中的间歇时间(秒)
-}

+ 125 - 0
lc/mqttclient.go

@@ -0,0 +1,125 @@
+package lc
+
+import (
+	"context"
+	"fmt"
+	"lc-smartX/util/mqtt"
+	"sync"
+	"time"
+
+	"github.com/sirupsen/logrus"
+)
+
+type BaseMqttOnline interface {
+	GetOnlineMsg() (string, string)
+	GetWillMsg() (string, string)
+}
+
+type EmptyMqttOnline struct {
+}
+
+func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
+	return "", ""
+}
+func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
+	return "", ""
+}
+
+type MqttClient struct {
+	mqtt       *mqtt.Client        //
+	mu         sync.Mutex          //保护mapTopics
+	mapTopics  map[string]mqtt.QOS //订阅的主题
+	timeout    uint                //超时时间,毫秒为单位
+	MqttOnline BaseMqttOnline      //是否发布上线消息&遗嘱消息
+}
+
+func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
+	o := MqttClient{
+		mapTopics:  make(map[string]mqtt.QOS),
+		timeout:    timeout,
+		MqttOnline: mqttOnline,
+	}
+	client, err := mqtt.NewClient(mqtt.ClientOptions{
+		Servers:       []string{server},
+		ClientID:      clientid,
+		Username:      user,
+		Password:      password,
+		AutoReconnect: true,
+	}, &o)
+	if err != nil {
+		panic(fmt.Sprintf("MQTT错误:", err.Error()))
+		return nil
+	}
+	o.mqtt = client
+	err = client.Connect(o.Ctx())
+	return &o
+}
+
+func (o *MqttClient) ConnectionLostHandler(err error) {
+	logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
+}
+
+func (o *MqttClient) OnConnectHandler() {
+	logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功")
+	//连接成功则订阅主题
+	for k, v := range o.mapTopics {
+		o.Subscribe(k, v)
+	}
+	topic, str := o.MqttOnline.GetOnlineMsg()
+	if topic != "" {
+		o.PublishString(topic, str, 0)
+	}
+	o.Publish("000000/cltled/LED20230408/down/switch", []byte("路口new-54"), mqtt.AtMostOnce)
+}
+
+func (o *MqttClient) GetWill() (topic string, payload string) {
+	return o.MqttOnline.GetWillMsg()
+}
+
+func (o *MqttClient) Connect() error {
+	if !o.mqtt.IsConnected() {
+		return o.mqtt.Connect(o.Ctx())
+	}
+	return nil
+}
+
+func (o *MqttClient) IsConnected() bool {
+	return o.mqtt.IsConnected()
+}
+
+func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
+	return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
+}
+func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
+	return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
+}
+func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
+	return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
+}
+
+func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; !ok {
+		o.mapTopics[topic] = qos
+	}
+	return o.mqtt.Subscribe(o.Ctx(), topic, qos)
+}
+
+func (o *MqttClient) Unsubscribe(topic string) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; ok {
+		delete(o.mapTopics, topic)
+	}
+	return o.mqtt.Unsubscribe(o.Ctx(), topic)
+}
+
+func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
+	return o.mqtt.Handle(topic, handler)
+}
+
+func (o *MqttClient) Ctx() context.Context {
+	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
+	return ctx
+}

+ 124 - 0
lc/mqttmgr.go

@@ -0,0 +1,124 @@
+package lc
+
+import (
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"github.com/sirupsen/logrus"
+	"lc-smartX/util"
+	"lc-smartX/util/mqtt"
+)
+
+type OptType uint8
+
+const (
+	ToCloud OptType = 1 //发布和订阅云端的消息
+)
+
+var _mqttMgronce sync.Once
+var _mqttMgrsingle *MQTTMgr
+
+// GetMQTTMgr 单态
+func GetMQTTMgr() *MQTTMgr {
+	_mqttMgronce.Do(func() {
+		_mqttMgrsingle = _newMQTTMgr()
+	})
+	return _mqttMgrsingle
+}
+
+type MQTTMgr struct {
+	Cloud *MqttClient
+	Queue *util.MlQueue
+}
+
+// 建两个client
+func _newMQTTMgr() *MQTTMgr {
+	mgr := &MQTTMgr{
+		Queue: util.NewQueue(2000),
+	}
+
+	mgr.Cloud = NewMqttClient(util.Config.Mqtt.Server,
+		"demo_client",
+		util.Config.Mqtt.User,
+		util.Config.Mqtt.Password,
+		util.Config.Mqtt.Timeout,
+		&EmptyMqttOnline{})
+
+	return mgr
+}
+
+// Subscribe 定阅
+func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
+	if o.Cloud != nil {
+		o.Cloud.Handle(topic, handler)
+		o.Cloud.Subscribe(topic, qos)
+	}
+}
+
+// UnSubscribe 退定
+func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
+	if o.Cloud != nil {
+		o.Cloud.Unsubscribe(topic)
+	}
+}
+
+// Publish 发布进队列
+func (o *MQTTMgr) Publish(topic string, payload []byte, qos mqtt.QOS, tp OptType) {
+	msg := MQTTMessage{
+		topic:   topic,
+		payload: payload,
+		qos:     qos,
+		tp:      tp,
+	}
+	o.Queue.Put(&msg)
+}
+
+// 发布低
+func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
+	var err error
+	if o.Cloud != nil {
+		err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos)
+	}
+	return err
+}
+
+// MQTTConnectMgr 连接保持
+func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
+	for {
+		time.Sleep(10 * time.Second)
+		if o.Cloud != nil {
+			o.Cloud.Connect()
+		}
+	}
+}
+
+func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
+	defer func() {
+		if err := recover(); err != nil {
+			logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err)
+			logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack()))
+			go o.MQTTMessageHandle(args)
+		}
+	}()
+	var err error
+	for { //队列中所有发布
+		if m, ok, _ := o.Queue.Get(); ok {
+			if msg, ok := m.(*MQTTMessage); ok {
+				err = o._publish(msg)
+				if err != nil {
+					logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
+				}
+			}
+		} else {
+			time.Sleep(200 * time.Millisecond)
+		}
+	}
+}
+
+type MQTTMessage struct {
+	topic   string
+	payload []byte
+	qos     mqtt.QOS
+	tp      OptType
+}

+ 0 - 80
lc/radar_event.go

@@ -1,80 +0,0 @@
-package lc
-
-import (
-	"github.com/jacobsa/go-serial/serial"
-	"github.com/sirupsen/logrus"
-	"lc-smartX/lc/model"
-	"lc-smartX/util"
-)
-
-func NewRadarEventServer() *RadarServer {
-	s := &RadarServer{Radars: util.Config.Radars, Notifiers: make(map[string]Notifier, 4)}
-	return s
-}
-
-type RadarServer struct {
-	Radars    []model.RadarInfo
-	Notifiers map[string]Notifier //485通道名
-}
-
-func (s *RadarServer) Start() {
-	for _, radar := range s.Radars {
-		go s.OpenSerial(radar.Port)
-	}
-}
-
-func (s *RadarServer) RegisterCallback(branch byte, notifier Notifier) {
-	for _, radar := range s.Radars {
-		//关联主路led屏和支路雷达;关联支路led屏和主路雷达
-		if branch == 0 && radar.Branch == 1 || branch == 1 && radar.Branch == 0 {
-			s.Notifiers[radar.Port] = notifier
-		}
-	}
-}
-
-func (s *RadarServer) Callback(port string) {
-	notifier, ok := s.Notifiers[port]
-	if !ok {
-		logrus.Errorf("回调函数注册表没有该ip:%s", port)
-		return
-	}
-	notifier.Notify()
-}
-
-func (s *RadarServer) OpenSerial(portName string) {
-	// 配置串口参数
-	options := serial.OpenOptions{
-		PortName:        portName, // /dev/ttymxc4 6 3
-		BaudRate:        9600,
-		DataBits:        8,
-		StopBits:        1,
-		MinimumReadSize: 4,
-	}
-
-	// 打开串口
-	port, err := serial.Open(options)
-	if err != nil {
-		panic(err.Error())
-	}
-
-	// 关闭串口
-	defer port.Close()
-	for {
-		// 读取数据
-		buf := make([]byte, 128)
-		n, err := port.Read(buf)
-		if err != nil {
-			break
-		}
-		if n < 8 {
-			continue
-		}
-		result := false
-		if buf[0] == 'x' && (buf[1] > '0' || buf[2] > '0' || buf[3] > '0') {
-			result = true
-		}
-		if result {
-			s.Callback(portName)
-		}
-	}
-}

+ 5 - 387
lc/screen.go

@@ -2,12 +2,9 @@ package lc
 
 import (
 	"fmt"
-	"github.com/sirupsen/logrus"
-	"golang.org/x/text/encoding/simplifiedchinese"
 	"lc-smartX/bx"
+	"lc-smartX/util/mqtt"
 	"net"
-	"strconv"
-	"time"
 )
 
 // Screener 屏接口
@@ -36,396 +33,17 @@ func NewScreen(name string, ip, port string) *Screen {
 }
 
 func (s *Screen) Display(id int) {
-	if !s.getLiveState() {
-		return
+	msg := "来车new-55"
+	if id == 0 {
+		msg = "路口new-54"
 	}
-	s.SendRam(id)
+	GetMQTTMgr().Publish("000000/cltled/LED20230408/down/switch", []byte(msg), mqtt.AtMostOnce, ToCloud)
 }
 
 // Correct 校正时间
 func (s *Screen) Correct() {
-	if !s.getLiveState() {
-		return
-	}
-	now := time.Now()
-	cmd := bx.NewBxCmdSystemClockCorrect(now)
-	data := bx.NewBxDataPackCmd(cmd)
-	s.Send(data.Pack())
 }
 
 // Reconnect 重连
 func (s *Screen) Reconnect() {
-	if s.getLiveState() {
-		return
-	}
-	conn, err := net.DialTimeout("tcp", s.Addr, 5*time.Second)
-	if err != nil {
-		logrus.Error(s.Name, "-", s.Addr, "[屏]重连接失败! error:", err)
-		return
-	}
-	logrus.Info(s.Name, "-", s.Addr, "[屏]连接成功!")
-	s.setConn(conn)
-	//读取屏信息
-	state := s.State()
-	s.StateInfo.Parse(state.Data)
-	params := s.Param()
-	s.Params.Parse(params.Data)
-}
-
-func (s *Screen) getLiveState() bool {
-	return s.liveState
-}
-
-func (s *Screen) setConn(conn net.Conn) {
-	s.conn = conn
-	s.liveState = true
-}
-
-// 给屏发送数据
-func (s *Screen) Send(data []byte) {
-	if !s.getLiveState() {
-		return
-	}
-	_, err := s.conn.Write(data)
-	if err != nil {
-		logrus.WithFields(map[string]interface{}{"设备名": s.Name}).Error("tcp write error:", err)
-		s.liveState = false
-	}
-}
-
-//以下对协议进行封装
-//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
-
-type Color byte
-
-const (
-	Default Color = iota
-	Red
-	Green
-	Yellow
-	Blue
-	LightBlue
-	LightPurple
-	White
-)
-
-const (
-	Normal = 0
-	Warn   = 1
-)
-
-// 发送动态区节目 0正常页面 1红色提醒页面
-func (s *Screen) SendRam(id int) {
-	file := FlashFile{}
-	if id == 0 {
-		file.SetMsg("减速慢行", Yellow)
-		file.SetMode(DefaultRunMode, DefaultDisplayMode)
-		file.SetOrigin(0, true, 0)
-		file.SetArea(64, true, 16)
-		s.TextRam(file, false)
-	} else {
-		file.SetMsg("支路来车", Red)
-		file.SetSoundData("支路来车,请减速")
-		file.SetMode(DefaultRunMode, DefaultDisplayMode)
-		file.SetOrigin(0, true, 0)
-		file.SetArea(64, true, 16)
-		s.TextRam(file, true)
-	}
-}
-
-// TextRam 发送动态区节目实现
-func (s *Screen) TextRam(ff FlashFile, needSpeak bool) {
-	if !s.getLiveState() {
-		return
-	}
-	var areas []bx.BxArea
-	encoder := simplifiedchinese.GB18030.NewEncoder()
-	var bytes []byte
-	if ff.color == Default {
-		bytes, _ = encoder.Bytes([]byte(ff.msg))
-	} else {
-		bytes, _ = encoder.Bytes([]byte("\\C" + strconv.Itoa(int(ff.color)) + ff.msg))
-	}
-	if needSpeak {
-		soundData, _ := encoder.Bytes([]byte(ff.soundData))
-		area := bx.NewBxAreaDynamic(0, 1, byte(ff.dispMode), ff.originX, ff.originY, ff.width,
-			ff.height, bytes, soundData, false)
-		areas = append(areas, area)
-	} else {
-		area := bx.NewBxAreaProgram(0, 1, byte(ff.dispMode), ff.originX, ff.originY, ff.width,
-			ff.height, bytes, false)
-		areas = append(areas, area)
-	}
-
-	//
-	cmd := bx.NewBxCmdSendDynamicArea(areas)
-	pack := bx.NewBxDataPackCmd(cmd)
-	pack.SetDisplayType(1) //动态显示模式
-	d := pack.Pack()
-	s.Send(d)
-	resp := s.ReadResp()
-	if !resp.IsAck() {
-		println("设备拒绝写文件! error:", resp.Error().Description)
-		return
-	}
-	//s.StateInfo.DynaAreaNum++
-}
-
-// DelRamText 删除动态区,不传删除所有
-func (s *Screen) DelRamText(numbers ...byte) {
-	if !s.getLiveState() {
-		return
-	}
-	cmd := bx.NewCmdDelDynamicArea(numbers)
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-	s.ReadResp()
-	if len(numbers) == 0 {
-		s.StateInfo.DynaAreaNum = 0
-	} else {
-		s.StateInfo.DynaAreaNum--
-	}
-}
-
-type RunMode byte
-
-const (
-	Loop              RunMode = iota //循环
-	LoopAndStayAtEnd                 //循环直到最后,停留在最后一个动态区
-	LoopAndTimeoutOff                //循环直到超时,超时后未更新不在显示
-	LoopAndStayAtLogo                //循环完后,停留显示LOGO
-	LoopAndOff                       //循环完后不在显示
-	LoopAndCountOff                  //循环设定次数后不在显示
-	DefaultRunMode    RunMode = Loop //默认
-)
-
-type DisplayMode byte
-
-const (
-	_                  DisplayMode = iota
-	Static                         //0x01——静止显示
-	QuickPunch                     //0x02——快速打出
-	MoveLeft                       //0x03——向左移动
-	MoveRight                      //0x04——向右移动
-	MoveUp                         //0x05——向上移动
-	MoveDown                       //0x06——向下移动
-	Flicker                        //0x07——闪烁
-	DefaultDisplayMode DisplayMode = Static
-)
-
-type FlashFile struct {
-	msg       string
-	soundData string
-	color     Color
-	runMode   RunMode
-	dispMode  DisplayMode
-	originX   uint16
-	originY   uint16
-	width     uint16
-	height    uint16
-}
-
-func (ft *FlashFile) SetMsg(msg string, color Color) {
-	ft.msg = msg
-	ft.color = color
-}
-func (ft *FlashFile) SetMode(runMode RunMode, displayMode DisplayMode) {
-	ft.runMode = runMode
-	ft.dispMode = displayMode
-}
-func (ft *FlashFile) SetSoundData(soundData string) {
-	ft.soundData = soundData
-}
-
-// SetOrigin xIsPixel=true表示x坐标为像素单位, =false表示以字节(8像素)为单位;y只有像素单位
-func (ft *FlashFile) SetOrigin(x uint16, xIsPixel bool, y uint16) {
-	ft.originY = y
-	if xIsPixel {
-		ft.originX = 0x8000 | x
-		return
-	}
-	ft.originX = x
-}
-
-// SetArea yIsPixel=true表示像素单位, =false表示以字节(8像素)为单位
-func (ft *FlashFile) SetArea(w uint16, yIsPixel bool, h uint16) {
-	ft.height = h
-	if yIsPixel {
-		ft.width = 0x8000 | w
-		return
-	}
-	ft.width = w
-}
-
-// TextFlash 发送静态文件节目,掉电保存,文件名格式"P000","P001"
-func (s *Screen) TextFlash(ft []FlashFile, name string, isLogo bool) {
-	if !s.getLiveState() {
-		return
-	}
-	encoder := simplifiedchinese.GB18030.NewEncoder()
-	if isLogo {
-		name = "LOGO"
-	}
-	var areas []bx.BxArea
-	for _, i := range ft {
-		var bytes []byte
-		if i.color == Default {
-			bytes, _ = encoder.Bytes([]byte(i.msg))
-		} else {
-			bytes, _ = encoder.Bytes([]byte("\\C" + strconv.Itoa(int(i.color)) + i.msg))
-		}
-		area := bx.NewBxAreaProgram(0xff, byte(i.runMode), byte(i.dispMode), i.originX, i.originY, i.width, i.height, bytes, false)
-		areas = append(areas, area)
-	}
-
-	file := bx.NewBxFile(name, "", areas)
-	cmd := file.NewCmdWriteFile()
-	pack := bx.NewBxDataPackCmd(cmd)
-	data := pack.Pack()
-	s.Send(data)
-	resp := s.ReadResp()
-	if !resp.IsAck() {
-		logrus.Error("设备拒绝写文件! error:", resp.Error().Description)
-		return
-	}
-	pack1 := bx.NewBxDataPackCmd(cmd)
-	data1 := pack1.Pack()
-	s.Send(data1)
-	resp1 := s.ReadResp()
-	if resp1.NoError() {
-		s.StateInfo.ProgramNum++
-	}
-}
-
-// Bitmap 发送自定义位图节目
-func (s *Screen) Bitmap(name string, bitmap []byte) {
-	file := bx.NewBitmapFile(name, bitmap)
-	cmd := file.NewCmd()
-	pack := bx.NewBxDataPackCmd(cmd)
-	data := pack.Pack()
-	s.Send(data)
-	resp := s.ReadResp()
-	if !resp.IsAck() {
-		logrus.Error("设备拒绝写文件! error:", resp.Error().Description)
-		return
-	}
-	pack1 := bx.NewBxDataPackCmd(cmd)
-	data1 := pack1.Pack()
-	s.Send(data1)
-	fmt.Printf("写图文件数据:% 02x\n", data1)
-	s.ReadResp()
-}
-
-// Lock 锁定状态:0x00——解锁状态,0x01——锁定状态
-func (s *Screen) Lock(flag byte, name string) {
-	cmd := bx.NewCmdLock(flag, name)
-	pack := bx.NewBxDataPackCmd(&cmd)
-	s.Send(pack.Pack())
-}
-
-// DelFile 删除静态文件节目
-func (s *Screen) DelFile(delFiles ...string) {
-	cmd := bx.NewCmdDeleteFile(delFiles)
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-	s.ReadResp()
-	if len(delFiles) == 0 {
-		s.StateInfo.ProgramNum = 0
-	} else {
-		s.StateInfo.ProgramNum--
-	}
-}
-
-// DelText 删除动态区域节目
-func (s *Screen) DelText(delIds []byte) {
-	cmd := bx.NewBxCmdSendDynamicArea(nil)
-	cmd.SetDelAreaIds(delIds)
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-}
-
-func (s *Screen) TurnOnOff(onOff bool) {
-	if !s.getLiveState() {
-		return
-	}
-	cmd := bx.NewBxCmdTurnOnOff(onOff)
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-}
-
-// TimingSwitch 定时开关屏
-// 13:49 开, 13:55 关,最多设置3组
-//
-//	onOffSet := [][2]uint64{
-//			{1349, 1355},
-//		}
-func (s *Screen) TimingSwitch(onOffSet [][2]uint64) {
-	if !s.getLiveState() {
-		return
-	}
-	cmd := bx.NewCmdTimingSwitch(onOffSet)
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-}
-
-func (s *Screen) CancelTimingSwitch() {
-	if !s.getLiveState() {
-		return
-	}
-	cmd := bx.NewCmdCancelTimingSwitch()
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-}
-
-func (s *Screen) State() *bx.BxResp {
-	if !s.getLiveState() {
-		return nil
-	}
-	cmd := bx.NewCmdState()
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-	r := s.ReadResp()
-	return &r
-}
-
-func (s *Screen) Param() *bx.BxResp {
-	if !s.getLiveState() {
-		return nil
-	}
-	cmd := bx.NewCmdReadParams()
-	pack := bx.NewBxDataPackCmd(cmd)
-	s.Send(pack.Pack())
-	r := s.ReadResp()
-	return &r
-}
-
-func (s *Screen) Info() {
-	s.StateInfo.Print(s.Name)
-}
-
-// ReadResp 读取响应
-func (s *Screen) ReadResp() bx.BxResp {
-	var resp = make([]byte, 1024)
-	read, err := s.conn.Read(resp)
-	if err != nil {
-		logrus.Error("读数据错误:", err)
-		s.liveState = false
-		return bx.BxResp{}
-	}
-	var bxResp = bx.BxResp{}
-	parse := bxResp.Parse(resp, read)
-	//if parse.IsAck() {
-	//	fmt.Println("response ACK")
-	//	fmt.Printf("原始响应数据:% 0x\n", resp[:read])
-	//	fmt.Println("解析响应数据:", parse)
-	//} else if parse.IsInfo() {
-	//	fmt.Println("state ACK")
-	//	fmt.Printf("原始响应数据:% 0x\n", resp[:read])
-	//	fmt.Println("解析响应数据:", parse)
-	//} else {
-	//	fmt.Println("response")
-	//	fmt.Printf("原始响应数据:% 0x\n", resp[:read])
-	//	fmt.Println("解析响应数据:", parse)
-	//}
-	return *parse
 }

+ 10 - 74
lc/server.go

@@ -11,31 +11,22 @@ type SmartXServer interface {
 }
 
 type IntersectionServer struct {
-	RadarEventServer  *RadarServer
 	CameraEventServer *CameraServer
-	MainState         byte
-	SubState          byte
-	MainDevices       []IDevice
-	SubDevices        []IDevice
+	State             byte
+	Devices           []IDevice
 	ReTicker          *time.Ticker
 	Main              *time.Ticker
-	Sub               *time.Ticker
 }
 
 func StartIntersectionServer() {
 	is := &IntersectionServer{
 		Main:     time.NewTicker(5 * time.Second),  //主路状态回滚
-		Sub:      time.NewTicker(5 * time.Second),  //支路状态回滚
 		ReTicker: time.NewTicker(30 * time.Second), //重连
 	}
 	if util.Config.Server.SupportCamera {
 		is.CameraEventServer = NewCameraEventServer()
 		gopool.Go(is.CameraEventServer.Start)
 	}
-	if util.Config.Server.SupportRadar {
-		is.RadarEventServer = NewRadarEventServer()
-		gopool.Go(is.RadarEventServer.Start)
-	}
 	//等事件服务先启动
 	time.Sleep(1 * time.Second)
 	is.Serve()
@@ -46,22 +37,9 @@ type MainNotifier struct{ s *IntersectionServer }
 // Notify 主路来车,通知支路设备
 func (m MainNotifier) Notify() {
 	m.s.Main.Reset(5 * time.Second)
-	if m.s.MainState != 1 {
-		m.s.MainState = 1
-		for _, v := range m.s.SubDevices {
-			gopool.Go(v.Call)
-		}
-	}
-}
-
-type SubNotifier struct{ s *IntersectionServer }
-
-// Notify 支路来车,通知主路设备
-func (sub SubNotifier) Notify() {
-	sub.s.Sub.Reset(5 * time.Second)
-	if sub.s.SubState != 1 {
-		sub.s.SubState = 1
-		for _, v := range sub.s.MainDevices {
+	if m.s.State != 1 {
+		m.s.State = 1
+		for _, v := range m.s.Devices {
 			gopool.Go(v.Call)
 		}
 	}
@@ -69,13 +47,8 @@ func (sub SubNotifier) Notify() {
 
 func (is *IntersectionServer) Serve() {
 	if util.Config.Server.SupportCamera {
-		is.CameraEventServer.RegisterCallback(1, &SubNotifier{is})
 		is.CameraEventServer.RegisterCallback(0, &MainNotifier{is})
 	}
-	if util.Config.Server.SupportRadar {
-		is.RadarEventServer.RegisterCallback(1, &SubNotifier{is})
-		is.RadarEventServer.RegisterCallback(0, &MainNotifier{is})
-	}
 
 	//先创建响应设备
 	for _, c := range util.Config.Screens {
@@ -88,56 +61,19 @@ func (is *IntersectionServer) Serve() {
 			},
 			Screen: NewScreen(c.Name, c.Ip, c.Port),
 		}
-		if c.Branch == 1 {
-			is.MainDevices = append(is.MainDevices, iDevice)
-		} else {
-			is.SubDevices = append(is.SubDevices, iDevice)
-		}
-	}
-	for _, c := range util.Config.Speakers {
-		iDevice := &IntersectionDevice{
-			Info: OutputDeviceInfo{
-				Name:   c.Name,
-				Ip:     c.Ip,
-				Branch: c.Branch,
-				Audio:  c.Audio,
-			},
-			Speaker: NewIpCast(c.Name, c.Ip, c.Audio, c.Speed, c.Volume),
-		}
-		if c.Branch == 1 {
-			is.MainDevices = append(is.MainDevices, iDevice)
-		} else {
-			is.SubDevices = append(is.SubDevices, iDevice)
-		}
+		is.Devices = append(is.Devices, iDevice)
+
 	}
 
 	for {
 		select {
 		case <-is.Main.C: //检查主路状态->支路输出设备回到初始状态
-			for _, v := range is.SubDevices {
-				if is.MainState == 1 {
-					gopool.Go(v.Rollback)
-				}
-			}
-			is.MainState = 0
-		case <-is.Sub.C: //检查支路状态->主路输出设备作出响应
-			for _, v := range is.MainDevices {
-				if is.SubState == 1 {
+			for _, v := range is.Devices {
+				if is.State == 1 {
 					gopool.Go(v.Rollback)
 				}
 			}
-			is.SubState = 0
-		case <-is.ReTicker.C: //每19s检查并尝试重连
-			gopool.Go(func() {
-				for _, v := range is.MainDevices {
-					gopool.Go(v.Reconnect)
-				}
-			})
-			gopool.Go(func() {
-				for _, v := range is.SubDevices {
-					gopool.Go(v.Reconnect)
-				}
-			})
+			is.State = 0
 		}
 	}
 }

+ 0 - 84
lc/speaker.go

@@ -1,84 +0,0 @@
-package lc
-
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	"github.com/sirupsen/logrus"
-	"io"
-	"lc-smartX/lc/model"
-	"net/http"
-)
-
-// Loudspeaker 扬声器接口
-type Loudspeaker interface {
-	Speak(txt string)
-}
-
-type IpCast struct {
-	Name      string
-	Ip        string
-	Speed     byte
-	Volume    byte
-	Audio     string
-	liveState bool
-}
-
-func NewIpCast(name, ip, audio string, speed, volume byte) *IpCast {
-	s := &IpCast{
-		Name:      name,
-		Ip:        ip,
-		Speed:     speed,
-		Volume:    volume,
-		Audio:     audio,
-		liveState: false,
-	}
-	s.Reconnect()
-	return s
-}
-
-func (ip *IpCast) Speak(txt string) {
-	data := &model.PlayReq{
-		Text:   txt,
-		Vcn:    "xiaoyan",
-		Speed:  ip.Speed,
-		Volume: ip.Volume,
-		Rdn:    "2",
-		Rcn:    "1",
-		Reg:    0,
-		Sync:   false,
-		Queue:  false,
-	}
-	data.Loop.Times = 1
-	body, err := json.Marshal(data)
-	if err != nil {
-		logrus.Errorf("IpCast Marshal err : %s", err.Error())
-		return
-	}
-
-	req, _ := http.NewRequest("POST", fmt.Sprintf("http://%s/v1/speech", ip.Ip), bytes.NewReader(body))
-	req.Header.Set("Content-Type", "application/json")
-	rsp, err := http.DefaultClient.Do(req)
-	if err != nil {
-		logrus.Errorf("IpCast Speak Do err : %s", err.Error())
-		return
-	}
-	rspData, err := io.ReadAll(rsp.Body)
-	logrus.Debugf("IpCast Speak rsp : %+v", string(rspData))
-
-}
-
-func (ip *IpCast) Reconnect() {
-	c := http.DefaultClient
-	req, _ := http.NewRequest("GET", fmt.Sprintf("http://%s/v1/check_alive", ip.Ip), nil)
-	rsp, err := c.Do(req)
-	if err != nil {
-		logrus.Errorf("IpCast Reconnect err : %s", err.Error())
-		return
-	}
-	if rsp.StatusCode == http.StatusOK {
-		ip.liveState = true
-	}
-}
-
-func (ip IpCast) CorrectTime() {}

+ 3 - 0
main.go

@@ -6,6 +6,9 @@ import (
 )
 
 func main() {
+	lc.GetMQTTMgr()
 	gopool.SetCap(64)
+	go lc.GetMQTTMgr().MQTTConnectMgr()
+	go lc.GetMQTTMgr().MQTTMessageHandle()
 	lc.StartIntersectionServer()
 }

+ 14 - 10
util/config.go

@@ -21,19 +21,23 @@ var Config = func() config {
 }()
 
 type config struct {
-	HikServer hikServer           `yaml:"hikServer"`
-	Cameras   []model.CameraInfo  `yaml:"cameras"`
-	Radars    []model.RadarInfo   `yaml:"radars"`
-	Screens   []model.ScreenInfo  `yaml:"screens"`
-	Speakers  []model.SpeakerInfo `yaml:"speakers"`
-	Server    service             `yaml:"service"`
+	HikServer hikServer          `yaml:"hikServer"`
+	Cameras   []model.CameraInfo `yaml:"cameras"`
+	Screens   []model.ScreenInfo `yaml:"screens"`
+	Server    service            `yaml:"service"`
+	Mqtt      MqttConfig         `yaml:"mqtt"`
+}
+
+type MqttConfig struct {
+	Server   string `json:"server"`
+	User     string `json:"user"`
+	Password string `json:"password"`
+	Timeout  uint   `json:"timeout"`
 }
 
 type service struct {
-	SupportRadar   bool `yaml:"support_radar"`
-	SupportCamera  bool `yaml:"support_camera"`
-	SupportSpeaker bool `yaml:"support_speaker"`
-	SupportLed     bool `yaml:"support_led"`
+	SupportCamera bool `yaml:"support_camera"`
+	SupportLed    bool `yaml:"support_led"`
 }
 
 type hikServer struct {

+ 164 - 0
util/mqtt/mqtt.go

@@ -0,0 +1,164 @@
+package mqtt
+
+import (
+	"context"
+	"crypto/tls"
+	"errors"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+	"github.com/google/uuid"
+)
+
+type ConnHandler interface {
+	ConnectionLostHandler(err error)
+	OnConnectHandler()
+	GetWill() (topic string, payload string)
+}
+
+// Client for talking using mqtt
+type Client struct {
+	Options     ClientOptions // The options that were used to create this client
+	client      paho.Client
+	router      *router
+	connhandler ConnHandler
+}
+
+// ClientOptions is the list of options used to create a client
+type ClientOptions struct {
+	Servers  []string // The list of broker hostnames to connect to
+	ClientID string   // If left empty a uuid will automatically be generated
+	Username string   // If not set then authentication will not be used
+	Password string   // Will only be used if the username is set
+
+	AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
+}
+
+// QOS describes the quality of service of an mqtt publish
+type QOS byte
+
+const (
+	// AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
+	AtMostOnce QOS = iota
+	// AtLeastOnce means the broker will deliver a message at least once to every subscriber
+	AtLeastOnce
+	// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
+	ExactlyOnce
+)
+
+var (
+	// ErrMinimumOneServer means that at least one server should be specified in the client options
+	ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
+)
+
+func handle(callback MessageHandler) paho.MessageHandler {
+	return func(client paho.Client, message paho.Message) {
+		if callback != nil {
+			callback(Message{message: message})
+		}
+	}
+}
+
+// NewClient creates a new client with the specified options
+func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
+	pahoOptions := paho.NewClientOptions()
+
+	// brokers
+	if options.Servers != nil && len(options.Servers) > 0 {
+		for _, server := range options.Servers {
+			pahoOptions.AddBroker(server)
+		}
+	} else {
+		return nil, ErrMinimumOneServer
+	}
+
+	// client id
+	if options.ClientID == "" {
+		options.ClientID = uuid.New().String()
+	}
+	pahoOptions.SetClientID(options.ClientID)
+
+	tls := &tls.Config{
+		InsecureSkipVerify: true,
+	}
+	pahoOptions.SetTLSConfig(tls)
+
+	// auth
+	if options.Username != "" {
+		pahoOptions.SetUsername(options.Username)
+		pahoOptions.SetPassword(options.Password)
+	}
+
+	// auto reconnect
+	pahoOptions.SetAutoReconnect(options.AutoReconnect)
+
+	pahoOptions.SetCleanSession(false)
+
+	var client Client
+	pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连
+	pahoOptions.SetOnConnectHandler(client.OnConnectHandler)           //连接
+	if t, m := connhandler.GetWill(); t != "" {
+		pahoOptions.SetWill(t, m, 0, false) //遗嘱消息
+	}
+
+	pahoClient := paho.NewClient(pahoOptions)
+	router := newRouter()
+	pahoClient.AddRoute("#", handle(func(message Message) {
+		routes := router.match(&message)
+		for _, route := range routes {
+			m := message
+			m.vars = route.vars(&message)
+			route.handler(m)
+		}
+	}))
+
+	client.client = pahoClient
+	client.Options = options
+	client.router = router
+	client.connhandler = connhandler
+
+	return &client, nil
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) Connect(ctx context.Context) error {
+	// try to connect to the client
+	token := c.client.Connect()
+	return tokenWithContext(ctx, token)
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) IsConnected() bool {
+	// try to connect to the client
+	return c.client.IsConnected()
+}
+
+// DisconnectImmediately will immediately close the connection with the mqtt servers
+func (c *Client) DisconnectImmediately() {
+	c.client.Disconnect(0)
+}
+
+func tokenWithContext(ctx context.Context, token paho.Token) error {
+	completer := make(chan error)
+
+	// TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
+	go func() {
+		token.Wait()
+		completer <- token.Error()
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case err := <-completer:
+			return err
+		}
+	}
+}
+func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
+	c.connhandler.ConnectionLostHandler(err)
+}
+
+func (c *Client) OnConnectHandler(client paho.Client) {
+	c.connhandler.OnConnectHandler()
+}

+ 46 - 0
util/mqtt/publish.go

@@ -0,0 +1,46 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+)
+
+// PublishOption are extra options when publishing a message
+type PublishOption int
+
+const (
+	// Retain tells the broker to retain a message and send it as the first message to new subscribers.
+	Retain PublishOption = iota
+)
+
+// Publish a message with a byte array payload
+func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, payload, qos, options)
+}
+
+// PublishString publishes a message with a string payload
+func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, []byte(payload), qos, options)
+}
+
+// PublishJSON publishes a message with the payload encoded as JSON using encoding/json
+func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
+	data, err := json.Marshal(payload)
+	if err != nil {
+		return err
+	}
+	return c.publish(ctx, topic, data, qos, options)
+}
+
+func (c *Client) publish(ctx context.Context, topic string, payload []byte, qos QOS, options []PublishOption) error {
+	var retained = false
+	for _, option := range options {
+		switch option {
+		case Retain:
+			retained = true
+		}
+	}
+
+	token := c.client.Publish(topic, byte(qos), retained, payload)
+	return tokenWithContext(ctx, token)
+}

+ 125 - 0
util/mqtt/router.go

@@ -0,0 +1,125 @@
+package mqtt
+
+import (
+	"strings"
+	"sync"
+
+	"github.com/google/uuid"
+)
+
+type router struct {
+	routes []Route
+	lock   sync.RWMutex
+}
+
+func newRouter() *router {
+	return &router{routes: []Route{}, lock: sync.RWMutex{}}
+}
+
+// Route is a receipt for listening or handling certain topic
+type Route struct {
+	router  *router
+	id      string
+	topic   string
+	handler MessageHandler
+}
+
+func newRoute(router *router, topic string, handler MessageHandler) Route {
+	return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler}
+}
+
+func match(route []string, topic []string) bool {
+	if len(route) == 0 {
+		return len(topic) == 0
+	}
+
+	if len(topic) == 0 {
+		return route[0] == "#"
+	}
+
+	if route[0] == "#" {
+		return true
+	}
+
+	if (route[0] == "+") || (route[0] == topic[0]) {
+		return match(route[1:], topic[1:])
+	}
+	return false
+}
+
+func routeIncludesTopic(route, topic string) bool {
+	return match(routeSplit(route), strings.Split(topic, "/"))
+}
+
+func routeSplit(route string) []string {
+	var result []string
+	if strings.HasPrefix(route, "$share") {
+		result = strings.Split(route, "/")[2:]
+	} else {
+		result = strings.Split(route, "/")
+	}
+	return result
+}
+
+func (r *Route) match(message *Message) bool {
+	return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic())
+}
+
+func (r *Route) vars(message *Message) []string {
+	var vars []string
+	route := routeSplit(r.topic)
+	topic := strings.Split(message.Topic(), "/")
+
+	for i, section := range route {
+		if section == "+" {
+			if len(topic) > i {
+				vars = append(vars, topic[i])
+			}
+		} else if section == "#" {
+			if len(topic) > i {
+				vars = append(vars, topic[i:]...)
+			}
+		}
+	}
+
+	return vars
+}
+
+func (r *router) addRoute(topic string, handler MessageHandler) Route {
+	if handler != nil {
+		route := newRoute(r, topic, handler)
+		r.lock.Lock()
+		r.routes = append(r.routes, route)
+		r.lock.Unlock()
+		return route
+	}
+	return Route{router: r}
+}
+
+func (r *router) removeRoute(removeRoute *Route) {
+	r.lock.Lock()
+	for i, route := range r.routes {
+		if route.id == removeRoute.id {
+			r.routes[i] = r.routes[len(r.routes)-1]
+			r.routes = r.routes[:len(r.routes)-1]
+		}
+	}
+	r.lock.Unlock()
+}
+
+func (r *router) match(message *Message) []Route {
+	routes := []Route{}
+	r.lock.RLock()
+	for _, route := range r.routes {
+		if route.match(message) {
+			routes = append(routes, route)
+		}
+	}
+	r.lock.RUnlock()
+	return routes
+}
+
+// Stop removes this route from the router and stops matching it
+func (r *Route) Stop() {
+	r.router.removeRoute(r)
+}

+ 99 - 0
util/mqtt/subscribe.go

@@ -0,0 +1,99 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+)
+
+// A Message from or to the broker
+type Message struct {
+	message paho.Message
+	vars    []string
+}
+
+// A MessageHandler to handle incoming messages
+type MessageHandler func(Message)
+
+// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
+// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
+func (m *Message) TopicVars() []string {
+	return m.vars
+}
+
+// Topic is the topic the message was recieved on
+func (m *Message) Topic() string {
+	return m.message.Topic()
+}
+
+// QOS is the quality of service the message was recieved with
+func (m *Message) QOS() QOS {
+	return QOS(m.message.Qos())
+}
+
+// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
+func (m *Message) IsDuplicate() bool {
+	return m.message.Duplicate()
+}
+
+// Acknowledge explicitly acknowledges to a broker that the message has been recieved
+func (m *Message) Acknowledge() {
+	m.message.Ack()
+}
+
+// Payload returns the payload as a byte array
+func (m *Message) Payload() []byte {
+	return m.message.Payload()
+}
+
+// PayloadString returns the payload as a string
+func (m *Message) PayloadString() string {
+	return string(m.message.Payload())
+}
+
+// PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
+func (m *Message) PayloadJSON(v interface{}) error {
+	return json.Unmarshal(m.message.Payload(), v)
+}
+
+// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
+// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
+func (c *Client) Handle(topic string, handler MessageHandler) Route {
+	return c.router.addRoute(topic, handler)
+}
+
+// Listen returns a stream of messages that match the topic.
+// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
+func (c *Client) Listen(topic string) (chan Message, Route) {
+	queue := make(chan Message)
+	route := c.router.addRoute(topic, func(message Message) {
+		queue <- message
+	})
+	return queue, route
+}
+
+// Subscribe subscribes to a certain topic and errors if this fails.
+func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
+	token := c.client.Subscribe(topic, byte(qos), nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// SubscribeMultiple subscribes to multiple topics and errors if this fails.
+func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
+	subs := make(map[string]byte, len(subscriptions))
+	for topic, qos := range subscriptions {
+		subs[topic] = byte(qos)
+	}
+	token := c.client.SubscribeMultiple(subs, nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// Unsubscribe unsubscribes from a certain topic and errors if this fails.
+func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
+	token := c.client.Unsubscribe(topic)
+	err := tokenWithContext(ctx, token)
+	return err
+}

+ 156 - 0
util/queue.go

@@ -0,0 +1,156 @@
+package util
+
+import (
+	"fmt"
+	"runtime"
+	"sync/atomic"
+)
+
+type mlCache struct {
+	putNo uint32
+	getNo uint32
+	value interface{}
+}
+
+type MlQueue struct {
+	capaciity uint32
+	capMod    uint32
+	putPos    uint32
+	getPos    uint32
+	cache     []mlCache
+}
+
+func NewQueue(capaciity uint32) *MlQueue {
+	q := new(MlQueue)
+	q.capaciity = minQuantity(capaciity)
+	q.capMod = q.capaciity - 1
+	q.putPos = 0
+	q.getPos = 0
+	q.cache = make([]mlCache, q.capaciity)
+	for i := range q.cache {
+		cache := &q.cache[i]
+		cache.getNo = uint32(i)
+		cache.putNo = uint32(i)
+	}
+	cache := &q.cache[0]
+	cache.getNo = q.capaciity
+	cache.putNo = q.capaciity
+	return q
+}
+
+func (q *MlQueue) String() string {
+	getPos := atomic.LoadUint32(&q.getPos)
+	putPos := atomic.LoadUint32(&q.putPos)
+	return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
+		q.capaciity, q.capMod, putPos, getPos)
+}
+
+func (q *MlQueue) Capaciity() uint32 {
+	return q.capaciity
+}
+
+func (q *MlQueue) Quantity() uint32 {
+	var putPos, getPos uint32
+	var quantity uint32
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		quantity = putPos - getPos
+	} else {
+		quantity = q.capMod + (putPos - getPos)
+	}
+
+	return quantity
+}
+
+func (q *MlQueue) Put(val interface{}) (ok bool, quantity uint32) {
+	var putPos, putPosNew, getPos, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt >= capMod-1 {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	putPosNew = putPos + 1
+	if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	cache = &q.cache[putPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if putPosNew == putNo && getNo == putNo {
+			cache.value = val
+			atomic.AddUint32(&cache.putNo, q.capaciity)
+			return true, posCnt + 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func (q *MlQueue) Get() (val interface{}, ok bool, quantity uint32) {
+	var putPos, getPos, getPosNew, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	putPos = atomic.LoadUint32(&q.putPos)
+	getPos = atomic.LoadUint32(&q.getPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt < 1 {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	getPosNew = getPos + 1
+	if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	cache = &q.cache[getPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if getPosNew == getNo && getNo == putNo-q.capaciity {
+			val = cache.value
+			atomic.AddUint32(&cache.getNo, q.capaciity)
+			return val, true, posCnt - 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func minQuantity(v uint32) uint32 {
+	v--
+	v |= v >> 1
+	v |= v >> 2
+	v |= v >> 4
+	v |= v >> 8
+	v |= v >> 16
+	v++
+	return v
+}