chengqian 1 month ago
commit
103870e6bc
20 changed files with 1308 additions and 0 deletions
  1. 8 0
      .idea/.gitignore
  2. 8 0
      .idea/modules.xml
  3. 9 0
      .idea/smart_tunnel_edge.iml
  4. 17 0
      config/config.yaml
  5. 26 0
      config/serial.json
  6. 15 0
      dev/environmentalSensor.json
  7. 15 0
      dev/opticalSensor.json
  8. 53 0
      go.mod
  9. 179 0
      go.sum
  10. 9 0
      main.go
  11. 164 0
      mqtt/mqtt.go
  12. 125 0
      mqtt/mqttclient.go
  13. 51 0
      mqtt/mqttmgr.go
  14. 46 0
      mqtt/publish.go
  15. 156 0
      mqtt/queue.go
  16. 124 0
      mqtt/router.go
  17. 99 0
      mqtt/subscribe.go
  18. 104 0
      service/mqtt_handle.go
  19. 58 0
      util/config/config.go
  20. 42 0
      util/logger/lclog.go

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/smart_tunnel_edge.iml" filepath="$PROJECT_DIR$/.idea/smart_tunnel_edge.iml" />
+    </modules>
+  </component>
+</project>

+ 9 - 0
.idea/smart_tunnel_edge.iml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="WEB_MODULE" version="4">
+  <component name="Go" enabled="true" />
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 17 - 0
config/config.yaml

@@ -0,0 +1,17 @@
+
+# Logger configurations.
+logger:
+  path: "./log"
+  level: "info"
+  name: "info"
+
+mqtt:
+  server: "tcp://106.52.134.22:1883"
+  id: "local_test"
+  user: "admin"
+  password: "admin"
+  
+device:
+  informant: "5b6d335d"
+  speech_speed: "5b73355d"
+  speech_volume: "5b7631305d"

+ 26 - 0
config/serial.json

@@ -0,0 +1,26 @@
+{
+  "serial":{
+    "1":{
+      "Code":1,
+      "Interface":"RS485_1",
+      "Address":"/dev/ttymxc4",
+      "BaudRate":9600,
+      "DataBits":8,
+      "StopBits":1,
+      "Parity":"N",
+      "Timeout":3000,
+      "ProtocolType":0
+    },
+    "2":{
+      "Code":2,
+      "Interface":"RS485_2",
+      "Address":"/dev/ttymxc6",
+      "BaudRate":9600,
+      "DataBits":8,
+      "StopBits":1,
+      "Parity":"N",
+      "Timeout":3000,
+      "ProtocolType":1
+    }
+  }
+}

+ 15 - 0
dev/environmentalSensor.json

@@ -0,0 +1,15 @@
+{
+  "dev": [
+    {
+      "Code": 1,
+      "DevID": 1,
+      "DevCode": "B542100015",
+      "TID": 4,
+      "Name": "环境传感器",
+      "SendCloud":5000,
+      "WaitTime": 1000,
+      "ProtocolType": 0,
+      "DevType": 5
+    }
+  ]
+}

+ 15 - 0
dev/opticalSensor.json

@@ -0,0 +1,15 @@
+{
+  "dev": [
+    {
+      "Code": 1,
+      "DevID": 1,
+      "DevCode": "B542100015",
+      "TID": 4,
+      "Name": "光传感器",
+      "SendCloud":5000,
+      "WaitTime": 1000,
+      "ProtocolType": 0,
+      "DevType": 5
+    }
+  ]
+}

+ 53 - 0
go.mod

@@ -0,0 +1,53 @@
+module smart_tunnel_edge
+
+go 1.17
+
+require (
+	github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304
+	github.com/eclipse/paho.mqtt.golang v1.5.0
+	github.com/google/uuid v1.2.0
+	github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
+	github.com/sirupsen/logrus v1.9.3
+	gopkg.in/yaml.v3 v3.0.1
+)
+
+require (
+	github.com/bytedance/sonic v1.11.6 // indirect
+	github.com/bytedance/sonic/loader v0.1.1 // indirect
+	github.com/cloudwego/base64x v0.1.4 // indirect
+	github.com/cloudwego/iasm v0.2.0 // indirect
+	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
+	github.com/fogleman/gg v1.3.0 // indirect
+	github.com/gabriel-vasile/mimetype v1.4.3 // indirect
+	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/gin-gonic/gin v1.10.0 // indirect
+	github.com/go-playground/locales v0.14.1 // indirect
+	github.com/go-playground/universal-translator v0.18.1 // indirect
+	github.com/go-playground/validator/v10 v10.20.0 // indirect
+	github.com/goccy/go-json v0.10.2 // indirect
+	github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
+	github.com/jonboulle/clockwork v0.5.0 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/klauspost/cpuid/v2 v2.2.7 // indirect
+	github.com/leodido/go-urn v1.4.0 // indirect
+	github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
+	github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/tebeka/strftime v0.1.5 // indirect
+	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+	github.com/ugorji/go/codec v1.2.12 // indirect
+	golang.org/x/arch v0.8.0 // indirect
+	golang.org/x/crypto v0.25.0 // indirect
+	golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect
+	golang.org/x/net v0.27.0 // indirect
+	golang.org/x/sync v0.7.0 // indirect
+	golang.org/x/sys v0.22.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
+	google.golang.org/protobuf v1.34.1 // indirect
+)

+ 179 - 0
go.sum

@@ -0,0 +1,179 @@
+github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
+github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
+github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
+github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
+github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
+github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
+github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
+github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304 h1:YSmbj/dqe9cV3vQRLIVfXIW22qMA18oyG/C54B19TFM=
+github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304/go.mod h1:dYDc/fkM/uhP6dEdKhhLvpw3fgzZB7lexG1w+ZlVfyk=
+github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
+github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
+github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
+github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
+github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
+github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
+github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
+github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
+github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
+github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
+github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
+github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
+github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
+github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
+github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
+github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
+github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
+github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
+github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
+github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
+github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
+github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
+github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
+github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
+github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
+github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
+github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
+github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
+github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
+github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
+github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4=
+github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8=
+github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc=
+github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
+github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/tebeka/strftime v0.1.5 h1:1NQKN1NiQgkqd/2moD6ySP/5CoZQsKa1d3ZhJ44Jpmg=
+github.com/tebeka/strftime v0.1.5/go.mod h1:29/OidkoWHdEKZqzyDLUyC+LmgDgdHo4WAFCDT7D/Ig=
+github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
+github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
+github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
+golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
+golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
+golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
+golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
+golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
+golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
+golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d h1:RNPAfi2nHY7C2srAV8A49jpsYr0ADedCk1wq6fTMTvs=
+golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
+golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
+golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
+golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
+golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
+golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
+golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
+golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
+golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
+golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
+google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
+rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

+ 9 - 0
main.go

@@ -0,0 +1,9 @@
+package main
+
+import "smart_tunnel_edge/util/logger"
+
+func main() {
+
+	logger.InitLog() //初始化日志
+
+}

+ 164 - 0
mqtt/mqtt.go

@@ -0,0 +1,164 @@
+package mqtt
+
+import (
+	"context"
+	"crypto/tls"
+	"errors"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+	"github.com/google/uuid"
+)
+
+type ConnHandler interface {
+	ConnectionLostHandler(err error)
+	OnConnectHandler()
+	GetWill() (topic string, payload string)
+}
+
+// Client for talking using mqtt
+type Client struct {
+	Options     ClientOptions // The options that were used to create this client
+	client      paho.Client
+	router      *router
+	connHandler ConnHandler
+}
+
+// ClientOptions is the list of options used to create a client
+type ClientOptions struct {
+	Servers  []string // The list of broker hostnames to connect to
+	ClientID string   // If left empty a uuid will automatically be generated
+	Username string   // If not set then authentication will not be used
+	Password string   // Will only be used if the username is set
+
+	AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
+}
+
+// QOS describes the quality of service of an mqtt publish
+type QOS byte
+
+const (
+	// AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
+	AtMostOnce QOS = iota
+	// AtLeastOnce means the broker will deliver a message at least once to every subscriber
+	AtLeastOnce
+	// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
+	ExactlyOnce
+)
+
+var (
+	// ErrMinimumOneServer means that at least one server should be specified in the client options
+	ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
+)
+
+func handle(callback MessageHandler) paho.MessageHandler {
+	return func(client paho.Client, message paho.Message) {
+		if callback != nil {
+			callback(Message{message: message})
+		}
+	}
+}
+
+// NewClient creates a new client with the specified options
+func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
+	pahoOptions := paho.NewClientOptions()
+
+	// brokers
+	if options.Servers != nil && len(options.Servers) > 0 {
+		for _, server := range options.Servers {
+			pahoOptions.AddBroker(server)
+		}
+	} else {
+		return nil, ErrMinimumOneServer
+	}
+
+	// client id
+	if options.ClientID == "" {
+		options.ClientID = uuid.New().String()
+	}
+	pahoOptions.SetClientID(options.ClientID)
+
+	t := &tls.Config{
+		InsecureSkipVerify: true,
+	}
+	pahoOptions.SetTLSConfig(t)
+
+	// auth
+	if options.Username != "" {
+		pahoOptions.SetUsername(options.Username)
+		pahoOptions.SetPassword(options.Password)
+	}
+
+	// auto reconnect
+	pahoOptions.SetAutoReconnect(options.AutoReconnect)
+
+	pahoOptions.SetCleanSession(false)
+
+	var client Client
+	pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连
+	pahoOptions.SetOnConnectHandler(client.OnConnectHandler)           //连接
+	if t, m := connhandler.GetWill(); t != "" {
+		pahoOptions.SetWill(t, m, 0, false) //遗嘱消息
+	}
+
+	pahoClient := paho.NewClient(pahoOptions)
+	router := newRouter()
+	pahoClient.AddRoute("#", handle(func(message Message) {
+		routes := router.match(&message)
+		for _, route := range routes {
+			m := message
+			m.vars = route.vars(&message)
+			route.handler(m)
+		}
+	}))
+
+	client.client = pahoClient
+	client.Options = options
+	client.router = router
+	client.connHandler = connhandler
+
+	return &client, nil
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) Connect(ctx context.Context) error {
+	// try to connect to the client
+	token := c.client.Connect()
+	return tokenWithContext(ctx, token)
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) IsConnected() bool {
+	// try to connect to the client
+	return c.client.IsConnected()
+}
+
+// DisconnectImmediately will immediately close the connection with the mqtt servers
+func (c *Client) DisconnectImmediately() {
+	c.client.Disconnect(0)
+}
+
+func tokenWithContext(ctx context.Context, token paho.Token) error {
+	completer := make(chan error)
+
+	// TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
+	go func() {
+		token.Wait()
+		completer <- token.Error()
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case err := <-completer:
+			return err
+		}
+	}
+}
+func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
+	c.connHandler.ConnectionLostHandler(err)
+}
+
+func (c *Client) OnConnectHandler(client paho.Client) {
+	c.connHandler.OnConnectHandler()
+}

+ 125 - 0
mqtt/mqttclient.go

@@ -0,0 +1,125 @@
+package mqtt
+
+import (
+	"context"
+	"fmt"
+	"smart_tunnel_edge/util/logger"
+	"sync"
+	"time"
+)
+
+type BaseMqttOnline interface {
+	GetOnlineMsg() (string, string)
+	GetWillMsg() (string, string)
+}
+
+type EmptyMqttOnline struct {
+}
+
+func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
+	return "", ""
+}
+func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
+	return "", ""
+}
+
+type MClient struct {
+	mqtt       *Client
+	mu         sync.Mutex     //保护mapTopics
+	mapTopics  map[string]QOS //订阅的主题
+	timeout    uint           //超时时间,毫秒为单位
+	MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
+}
+
+func NewMqttClient(server, clientId, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MClient {
+	o := MClient{
+		mapTopics:  make(map[string]QOS),
+		timeout:    timeout,
+		MqttOnline: mqttOnline,
+	}
+	client, err := NewClient(ClientOptions{
+		Servers:       []string{server},
+		ClientID:      clientId,
+		Username:      user,
+		Password:      password,
+		AutoReconnect: true,
+	}, &o)
+	if err != nil {
+		panic(fmt.Sprintf("MQTT错误: %s", err.Error()))
+		return nil
+	}
+	o.mqtt = client
+	err = client.Connect(o.Ctx())
+	return &o
+}
+
+func (o *MClient) ConnectionLostHandler(err error) {
+	logger.Logger.Errorln("MClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
+}
+
+func (o *MClient) OnConnectHandler() {
+	logger.Logger.Infoln("MClient.OnConnectHandler:MQTT连接成功")
+	//连接成功则订阅主题
+	for k, v := range o.mapTopics {
+		err := o.Subscribe(k, v)
+		if err != nil {
+			return
+		}
+	}
+	topic, str := o.MqttOnline.GetOnlineMsg()
+	if topic != "" {
+		err := o.PublishString(topic, str, 0)
+		if err != nil {
+			return
+		}
+	}
+}
+
+func (o *MClient) GetWill() (topic string, payload string) {
+	return o.MqttOnline.GetWillMsg()
+}
+
+func (o *MClient) Connect() error {
+	return o.mqtt.Connect(o.Ctx())
+}
+
+func (o *MClient) IsConnected() bool {
+	return o.mqtt.IsConnected()
+}
+
+func (o *MClient) Publish(topic string, payload interface{}, qos QOS) error {
+	return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
+}
+func (o *MClient) PublishString(topic string, payload string, qos QOS) error {
+	return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
+}
+func (o *MClient) PublishJSON(topic string, payload interface{}, qos QOS) error {
+	return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
+}
+
+func (o *MClient) Subscribe(topic string, qos QOS) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; !ok {
+		o.mapTopics[topic] = qos
+	}
+	return o.mqtt.Subscribe(o.Ctx(), topic, qos)
+}
+
+func (o *MClient) Unsubscribe(topic string) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; ok {
+		delete(o.mapTopics, topic)
+	}
+	return o.mqtt.Unsubscribe(o.Ctx(), topic)
+}
+
+func (o *MClient) Handle(topic string, handler MessageHandler) Route {
+	return o.mqtt.Handle(topic, handler)
+}
+
+func (o *MClient) Ctx() context.Context {
+	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
+	return ctx
+}

+ 51 - 0
mqtt/mqttmgr.go

@@ -0,0 +1,51 @@
+package mqtt
+
+import (
+	"smart_tunnel_edge/util/config"
+	"sync"
+)
+
+var _once sync.Once
+var _mgr *Mgr
+
+func GetMQTTMgr() *Mgr {
+	_once.Do(func() {
+		_mgr = _newMQTTMgr()
+	})
+	return _mgr
+}
+
+type Mgr struct {
+	Cloud *MClient
+}
+
+func _newMQTTMgr() *Mgr {
+	cfg := config.Instance().Mqtt
+	return &Mgr{
+		Cloud: NewMqttClient(cfg.Server,
+			cfg.Id,
+			cfg.User,
+			cfg.Password,
+			3000, &EmptyMqttOnline{}),
+	}
+
+}
+
+func (o *Mgr) Subscribe(topic string, qos QOS, handler MessageHandler) {
+	o.Cloud.Handle(topic, handler)
+	err := o.Cloud.Subscribe(topic, qos)
+	if err != nil {
+		return
+	}
+}
+
+func (o *Mgr) UnSubscribe(topic string) {
+	err := o.Cloud.Unsubscribe(topic)
+	if err != nil {
+		return
+	}
+}
+
+func (o *Mgr) Publish(topic string, payload interface{}, qos QOS) error {
+	return o.Cloud.Publish(topic, payload, qos)
+}

+ 46 - 0
mqtt/publish.go

@@ -0,0 +1,46 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+)
+
+// PublishOption are extra options when publishing a message
+type PublishOption int
+
+const (
+	// Retain tells the broker to retain a message and send it as the first message to new subscribers.
+	Retain PublishOption = iota
+)
+
+// Publish a message with a byte array payload
+func (c *Client) Publish(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, payload, qos, options)
+}
+
+// PublishString publishes a message with a string payload
+func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, []byte(payload), qos, options)
+}
+
+// PublishJSON publishes a message with the payload encoded as JSON using encoding/json
+func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
+	data, err := json.Marshal(payload)
+	if err != nil {
+		return err
+	}
+	return c.publish(ctx, topic, data, qos, options)
+}
+
+func (c *Client) publish(ctx context.Context, topic string, payload interface{}, qos QOS, options []PublishOption) error {
+	var retained = false
+	for _, option := range options {
+		switch option {
+		case Retain:
+			retained = true
+		}
+	}
+
+	token := c.client.Publish(topic, byte(qos), retained, payload)
+	return tokenWithContext(ctx, token)
+}

+ 156 - 0
mqtt/queue.go

@@ -0,0 +1,156 @@
+package mqtt
+
+import (
+	"fmt"
+	"runtime"
+	"sync/atomic"
+)
+
+type mlCache struct {
+	putNo uint32
+	getNo uint32
+	value interface{}
+}
+
+type MlQueue struct {
+	capacity uint32
+	capMod   uint32
+	putPos   uint32
+	getPos   uint32
+	cache    []mlCache
+}
+
+func NewQueue(capacity uint32) *MlQueue {
+	q := new(MlQueue)
+	q.capacity = minQuantity(capacity)
+	q.capMod = q.capacity - 1
+	q.putPos = 0
+	q.getPos = 0
+	q.cache = make([]mlCache, q.capacity)
+	for i := range q.cache {
+		cache := &q.cache[i]
+		cache.getNo = uint32(i)
+		cache.putNo = uint32(i)
+	}
+	cache := &q.cache[0]
+	cache.getNo = q.capacity
+	cache.putNo = q.capacity
+	return q
+}
+
+func (q *MlQueue) String() string {
+	getPos := atomic.LoadUint32(&q.getPos)
+	putPos := atomic.LoadUint32(&q.putPos)
+	return fmt.Sprintf("Queue{capacity: %v, capMod: %v, putPos: %v, getPos: %v}",
+		q.capacity, q.capMod, putPos, getPos)
+}
+
+func (q *MlQueue) Capacity() uint32 {
+	return q.capacity
+}
+
+func (q *MlQueue) Quantity() uint32 {
+	var putPos, getPos uint32
+	var quantity uint32
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		quantity = putPos - getPos
+	} else {
+		quantity = q.capMod + (putPos - getPos)
+	}
+
+	return quantity
+}
+
+func (q *MlQueue) Put(val interface{}) (ok bool, quantity uint32) {
+	var putPos, putPosNew, getPos, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt >= capMod-1 {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	putPosNew = putPos + 1
+	if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	cache = &q.cache[putPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if putPosNew == putNo && getNo == putNo {
+			cache.value = val
+			atomic.AddUint32(&cache.putNo, q.capacity)
+			return true, posCnt + 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func (q *MlQueue) Get() (val interface{}, ok bool, quantity uint32) {
+	var putPos, getPos, getPosNew, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	putPos = atomic.LoadUint32(&q.putPos)
+	getPos = atomic.LoadUint32(&q.getPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt < 1 {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	getPosNew = getPos + 1
+	if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	cache = &q.cache[getPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if getPosNew == getNo && getNo == putNo-q.capacity {
+			val = cache.value
+			atomic.AddUint32(&cache.getNo, q.capacity)
+			return val, true, posCnt - 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func minQuantity(v uint32) uint32 {
+	v--
+	v |= v >> 1
+	v |= v >> 2
+	v |= v >> 4
+	v |= v >> 8
+	v |= v >> 16
+	v++
+	return v
+}

+ 124 - 0
mqtt/router.go

@@ -0,0 +1,124 @@
+package mqtt
+
+import (
+	"github.com/google/uuid"
+	"strings"
+	"sync"
+)
+
+type router struct {
+	routes []Route
+	lock   sync.RWMutex
+}
+
+func newRouter() *router {
+	return &router{routes: []Route{}, lock: sync.RWMutex{}}
+}
+
+// Route is a receipt for listening or handling certain topic
+type Route struct {
+	router  *router
+	id      string
+	topic   string
+	handler MessageHandler
+}
+
+func newRoute(router *router, topic string, handler MessageHandler) Route {
+	return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler}
+}
+
+func match(route []string, topic []string) bool {
+	if len(route) == 0 {
+		return len(topic) == 0
+	}
+
+	if len(topic) == 0 {
+		return route[0] == "#"
+	}
+
+	if route[0] == "#" {
+		return true
+	}
+
+	if (route[0] == "+") || (route[0] == topic[0]) {
+		return match(route[1:], topic[1:])
+	}
+	return false
+}
+
+func routeIncludesTopic(route, topic string) bool {
+	return match(routeSplit(route), strings.Split(topic, "/"))
+}
+
+func routeSplit(route string) []string {
+	var result []string
+	if strings.HasPrefix(route, "$share") {
+		result = strings.Split(route, "/")[2:]
+	} else {
+		result = strings.Split(route, "/")
+	}
+	return result
+}
+
+func (r *Route) match(message *Message) bool {
+	return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic())
+}
+
+func (r *Route) vars(message *Message) []string {
+	var vars []string
+	route := routeSplit(r.topic)
+	topic := strings.Split(message.Topic(), "/")
+
+	for i, section := range route {
+		if section == "+" {
+			if len(topic) > i {
+				vars = append(vars, topic[i])
+			}
+		} else if section == "#" {
+			if len(topic) > i {
+				vars = append(vars, topic[i:]...)
+			}
+		}
+	}
+
+	return vars
+}
+
+func (r *router) addRoute(topic string, handler MessageHandler) Route {
+	if handler != nil {
+		route := newRoute(r, topic, handler)
+		r.lock.Lock()
+		r.routes = append(r.routes, route)
+		r.lock.Unlock()
+		return route
+	}
+	return Route{router: r}
+}
+
+func (r *router) removeRoute(removeRoute *Route) {
+	r.lock.Lock()
+	for i, route := range r.routes {
+		if route.id == removeRoute.id {
+			r.routes[i] = r.routes[len(r.routes)-1]
+			r.routes = r.routes[:len(r.routes)-1]
+		}
+	}
+	r.lock.Unlock()
+}
+
+func (r *router) match(message *Message) []Route {
+	routes := []Route{}
+	r.lock.RLock()
+	for _, route := range r.routes {
+		if route.match(message) {
+			routes = append(routes, route)
+		}
+	}
+	r.lock.RUnlock()
+	return routes
+}
+
+// Stop removes this route from the router and stops matching it
+func (r *Route) Stop() {
+	r.router.removeRoute(r)
+}

+ 99 - 0
mqtt/subscribe.go

@@ -0,0 +1,99 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+)
+
+// A Message from or to the broker
+type Message struct {
+	message paho.Message
+	vars    []string
+}
+
+// A MessageHandler to handle incoming messages
+type MessageHandler func(Message)
+
+// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
+// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
+func (m *Message) TopicVars() []string {
+	return m.vars
+}
+
+// Topic is the topic the message was recieved on
+func (m *Message) Topic() string {
+	return m.message.Topic()
+}
+
+// QOS is the quality of service the message was recieved with
+func (m *Message) QOS() QOS {
+	return QOS(m.message.Qos())
+}
+
+// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
+func (m *Message) IsDuplicate() bool {
+	return m.message.Duplicate()
+}
+
+// Acknowledge explicitly acknowledges to a broker that the message has been recieved
+func (m *Message) Acknowledge() {
+	m.message.Ack()
+}
+
+// Payload returns the payload as a byte array
+func (m *Message) Payload() []byte {
+	return m.message.Payload()
+}
+
+// PayloadString returns the payload as a string
+func (m *Message) PayloadString() string {
+	return string(m.message.Payload())
+}
+
+// PayloadJSON unmarshal the payload into the provided interface using encoding/json and returns an error if anything fails
+func (m *Message) PayloadJSON(v interface{}) error {
+	return json.Unmarshal(m.message.Payload(), v)
+}
+
+// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
+// Also returns a route that can be used to unsubscribe. Does not automatically subscribe.
+func (c *Client) Handle(topic string, handler MessageHandler) Route {
+	return c.router.addRoute(topic, handler)
+}
+
+// Listen returns a stream of messages that match the topic.
+// Also returns a route that can be used to unsubscribe. Does not automatically subscribe.
+func (c *Client) Listen(topic string) (chan Message, Route) {
+	queue := make(chan Message)
+	route := c.router.addRoute(topic, func(message Message) {
+		queue <- message
+	})
+	return queue, route
+}
+
+// Subscribe subscribes to a certain topic and errors if this fails.
+func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
+	token := c.client.Subscribe(topic, byte(qos), nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// SubscribeMultiple subscribes to multiple topics and errors if this fails.
+func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
+	subs := make(map[string]byte, len(subscriptions))
+	for topic, qos := range subscriptions {
+		subs[topic] = byte(qos)
+	}
+	token := c.client.SubscribeMultiple(subs, nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// Unsubscribe unsubscribes from a certain topic and errors if this fails.
+func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
+	token := c.client.Unsubscribe(topic)
+	err := tokenWithContext(ctx, token)
+	return err
+}

+ 104 - 0
service/mqtt_handle.go

@@ -0,0 +1,104 @@
+package service
+
+import (
+	"errors"
+	"fmt"
+	"runtime"
+	"runtime/debug"
+	"smart_tunnel_edge/mqtt"
+	"smart_tunnel_edge/util/logger"
+	"strings"
+	"sync"
+	"time"
+)
+
+func InitMqtt() {
+	MqttService = GetHandler()
+	MqttService.SubscribeTopics()
+	go MqttService.Handler()
+}
+
+var MqttService *MqttHandler
+
+type MqttHandler struct {
+	queue *mqtt.MlQueue
+}
+
+var _handlerOnce sync.Once
+var _handlerSingle *MqttHandler
+
+func GetHandler() *MqttHandler {
+	_handlerOnce.Do(func() {
+		_handlerSingle = &MqttHandler{
+			queue: mqtt.NewQueue(10000),
+		}
+	})
+	return _handlerSingle
+}
+
+func (o *MqttHandler) SubscribeTopics() {
+	mqtt.GetMQTTMgr().Subscribe("smart_intersection/#", mqtt.AtLeastOnce, o.HandlerData)
+}
+
+func (o *MqttHandler) HandlerData(m mqtt.Message) {
+	for {
+		ok, cnt := o.queue.Put(&m)
+		if ok {
+			break
+		} else {
+			logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
+			runtime.Gosched()
+		}
+	}
+}
+
+func (o *MqttHandler) Handler() interface{} {
+	defer func() {
+		if err := recover(); err != nil {
+			go GetHandler().Handler()
+			logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
+		}
+	}()
+	for {
+		msg, ok, quantity := o.queue.Get()
+		if !ok {
+			time.Sleep(10 * time.Millisecond)
+			continue
+		} else if quantity > 1000 {
+			logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
+		}
+		m, ok := msg.(*mqtt.Message)
+		if !ok {
+			continue
+		}
+
+		_, topic, err := parseTopic(m.Topic())
+		if err != nil {
+			logger.Logger.Errorf("parseTopic err")
+			continue
+		}
+		switch topic {
+		case TopicDeviceGateway:
+
+		case TopicDeviceCamera:
+
+		default:
+			fmt.Println("我是主题:::", topic)
+		}
+	}
+}
+
+func parseTopic(topic string) (string, string, error) {
+	strList := strings.Split(topic, "/")
+	if len(strList) < 4 {
+		return "", "", errors.New("不支持的topic")
+	}
+	topic = strings.Join(strList[2:], "/")
+	return strList[1], topic, nil
+}
+
+const (
+	TopicDeviceGateway = "device/gateway"
+	TopicDeviceCamera  = "device/camera"
+	TopicDeviceScreens = "device/screens"
+)

+ 58 - 0
util/config/config.go

@@ -0,0 +1,58 @@
+package config
+
+import (
+	"gopkg.in/yaml.v3"
+	"os"
+	"sync"
+)
+
+var (
+	instance *config
+	once     sync.Once
+)
+
+func init() {
+	once.Do(func() {
+		var conf config
+		filePath := "./config/config.yaml"
+		if f, err := os.Open(filePath); err != nil {
+			panic(err)
+		} else {
+			err := yaml.NewDecoder(f).Decode(&conf)
+			if err != nil {
+				panic(err)
+			}
+		}
+		instance = &conf
+	})
+}
+
+// 获取配置文档实例
+func Instance() *config {
+	return instance
+}
+
+type config struct {
+	Logger logger `yaml:"logger"`
+	Mqtt   mqtt   `yaml:"mqtt"`
+	Device device `yaml:"device"`
+}
+
+type logger struct {
+	Path  string `yaml:"path"`
+	Name  string `yaml:"name"`
+	Level string `yaml:"level"`
+}
+
+type mqtt struct {
+	Server   string `yaml:"server"`
+	Id       string `yaml:"id"`
+	User     string `yaml:"user"`
+	Password string `yaml:"password"`
+}
+
+type device struct {
+	Informant    string `yaml:"informant"`
+	SpeechSpeed  string `yaml:"speech_speed"`
+	SpeechVolume string `yaml:"speech_volume"`
+}

+ 42 - 0
util/logger/lclog.go

@@ -0,0 +1,42 @@
+package logger
+
+import (
+	"github.com/druidcaesa/gotool"
+	rotatelogs "github.com/lestrrat/go-file-rotatelogs"
+	"github.com/sirupsen/logrus"
+	"os"
+	"path"
+	"smart_tunnel_edge/util/config"
+	"time"
+)
+
+var Logger *logrus.Logger
+
+func InitLog() {
+	logFilePath := config.Instance().Logger.Path
+	logFileName := config.Instance().Logger.Name
+
+	err := os.MkdirAll(logFilePath, os.ModeDir)
+	if err != nil {
+		gotool.Logs.ErrorLog().Println(err)
+		panic(err)
+	}
+
+	// 日志文件
+	fileName := path.Join(logFilePath, logFileName)
+	writer, _ := rotatelogs.New(
+		fileName+".%Y%m%d.log",
+		rotatelogs.WithMaxAge(7*24*time.Hour),     // 文件最大保存时间 7天
+		rotatelogs.WithRotationTime(24*time.Hour), // 日志切割时间间隔
+	)
+	// 实例化
+	logger := logrus.New()
+
+	logger.SetFormatter(&logrus.JSONFormatter{
+		TimestampFormat: "2006-01-02 15:04:05.000",
+	})
+	// 设置日志级别
+	logger.SetLevel(logrus.DebugLevel)
+	logger.SetOutput(writer)
+	Logger = logger
+}