chengqian 5 hónapja
commit
148b445ce2
20 módosított fájl, 1424 hozzáadás és 0 törlés
  1. 2 0
      build.bat
  2. 12 0
      config/config.yaml
  3. 8 0
      dev/devs.json
  4. 35 0
      go.mod
  5. 65 0
      go.sum
  6. 26 0
      main.go
  7. 164 0
      mqtt/mqtt.go
  8. 125 0
      mqtt/mqttclient.go
  9. 51 0
      mqtt/mqttmgr.go
  10. 46 0
      mqtt/publish.go
  11. 156 0
      mqtt/queue.go
  12. 124 0
      mqtt/router.go
  13. 99 0
      mqtt/subscribe.go
  14. 137 0
      service/deviceMgr.go
  15. 1 0
      service/model.go
  16. 114 0
      service/mqtt_handle.go
  17. 98 0
      service/tcp.go
  18. 29 0
      timer/task.go
  19. 90 0
      util/config/config.go
  20. 42 0
      util/logger/lclog.go

+ 2 - 0
build.bat

@@ -0,0 +1,2 @@
+%此脚本需要在命令窗口执行,不能直接运行%
+go env -w CGO_ENABLED=0 GOOS=linux GOARCH=arm && go build -o build/smart_intersection_edge ./

+ 12 - 0
config/config.yaml

@@ -0,0 +1,12 @@
+projectName: "四川高速公路"
+logger:
+    level: info
+    name: info
+    path: ./log
+mqtt:
+    id: smart_intersectionV2.0_edge
+    password: admin
+    server: tcp://118.253.180.18:1883
+    user: admin
+
+

+ 8 - 0
dev/devs.json

@@ -0,0 +1,8 @@
+{
+  "ledScreens": [
+    {
+      "Sn": "071995171560000000c40808",
+      "ScreensName": "测试屏幕三"
+    }
+  ]
+}

+ 35 - 0
go.mod

@@ -0,0 +1,35 @@
+module smartIntersection_edge
+
+go 1.24.2
+
+require (
+	github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304
+	github.com/eclipse/paho.mqtt.golang v1.5.0
+	github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
+	github.com/robfig/cron v1.2.0
+	github.com/sirupsen/logrus v1.9.3
+)
+
+require (
+	github.com/fogleman/gg v1.3.0 // indirect
+	github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
+	github.com/google/uuid v1.2.0 // indirect
+	github.com/stretchr/testify v1.9.0 // indirect
+	golang.org/x/crypto v0.25.0 // indirect
+	golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect
+	golang.org/x/net v0.27.0 // indirect
+	golang.org/x/sys v0.22.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1
+)
+
+require (
+	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
+	github.com/jonboulle/clockwork v0.5.0 // indirect
+	github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
+	github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/tebeka/strftime v0.1.5 // indirect
+	golang.org/x/sync v0.7.0 // indirect
+)

+ 65 - 0
go.sum

@@ -0,0 +1,65 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304 h1:YSmbj/dqe9cV3vQRLIVfXIW22qMA18oyG/C54B19TFM=
+github.com/druidcaesa/gotool v0.0.0-20220613023420-645c641d1304/go.mod h1:dYDc/fkM/uhP6dEdKhhLvpw3fgzZB7lexG1w+ZlVfyk=
+github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
+github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
+github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
+github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
+github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
+github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
+github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
+github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
+github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
+github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
+github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
+github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
+github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4=
+github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8=
+github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc=
+github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/tebeka/strftime v0.1.5 h1:1NQKN1NiQgkqd/2moD6ySP/5CoZQsKa1d3ZhJ44Jpmg=
+github.com/tebeka/strftime v0.1.5/go.mod h1:29/OidkoWHdEKZqzyDLUyC+LmgDgdHo4WAFCDT7D/Ig=
+golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
+golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
+golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d h1:RNPAfi2nHY7C2srAV8A49jpsYr0ADedCk1wq6fTMTvs=
+golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
+golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
+golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 26 - 0
main.go

@@ -0,0 +1,26 @@
+package main
+
+import (
+	"smartIntersection_edge/service"
+	"smartIntersection_edge/timer"
+	"smartIntersection_edge/util/config"
+	"smartIntersection_edge/util/logger"
+)
+
+func main() {
+	logger.InitLog()
+
+	service.InitMqtt() //初始化mqtt
+
+	//初始化设备
+	err := config.LoadDevConfig()
+	if err != nil {
+		logger.Logger.Errorf("加载设备配置失败: %v", err)
+		return
+	}
+
+	go service.ListenTcp() //监听tcp
+	go timer.IsOnline()    //检测在线
+
+	select {}
+}

+ 164 - 0
mqtt/mqtt.go

@@ -0,0 +1,164 @@
+package mqtt
+
+import (
+	"context"
+	"crypto/tls"
+	"errors"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+	"github.com/google/uuid"
+)
+
+type ConnHandler interface {
+	ConnectionLostHandler(err error)
+	OnConnectHandler()
+	GetWill() (topic string, payload string)
+}
+
+// Client for talking using mqtt
+type Client struct {
+	Options     ClientOptions // The options that were used to create this client
+	client      paho.Client
+	router      *router
+	connHandler ConnHandler
+}
+
+// ClientOptions is the list of options used to create a client
+type ClientOptions struct {
+	Servers  []string // The list of broker hostnames to connect to
+	ClientID string   // If left empty a uuid will automatically be generated
+	Username string   // If not set then authentication will not be used
+	Password string   // Will only be used if the username is set
+
+	AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
+}
+
+// QOS describes the quality of service of an mqtt publish
+type QOS byte
+
+const (
+	// AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
+	AtMostOnce QOS = iota
+	// AtLeastOnce means the broker will deliver a message at least once to every subscriber
+	AtLeastOnce
+	// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
+	ExactlyOnce
+)
+
+var (
+	// ErrMinimumOneServer means that at least one server should be specified in the client options
+	ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
+)
+
+func handle(callback MessageHandler) paho.MessageHandler {
+	return func(client paho.Client, message paho.Message) {
+		if callback != nil {
+			callback(Message{message: message})
+		}
+	}
+}
+
+// NewClient creates a new client with the specified options
+func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
+	pahoOptions := paho.NewClientOptions()
+
+	// brokers
+	if options.Servers != nil && len(options.Servers) > 0 {
+		for _, server := range options.Servers {
+			pahoOptions.AddBroker(server)
+		}
+	} else {
+		return nil, ErrMinimumOneServer
+	}
+
+	// client id
+	if options.ClientID == "" {
+		options.ClientID = uuid.New().String()
+	}
+	pahoOptions.SetClientID(options.ClientID)
+
+	t := &tls.Config{
+		InsecureSkipVerify: true,
+	}
+	pahoOptions.SetTLSConfig(t)
+
+	// auth
+	if options.Username != "" {
+		pahoOptions.SetUsername(options.Username)
+		pahoOptions.SetPassword(options.Password)
+	}
+
+	// auto reconnect
+	pahoOptions.SetAutoReconnect(options.AutoReconnect)
+
+	pahoOptions.SetCleanSession(false)
+
+	var client Client
+	pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连
+	pahoOptions.SetOnConnectHandler(client.OnConnectHandler)           //连接
+	if t, m := connhandler.GetWill(); t != "" {
+		pahoOptions.SetWill(t, m, 0, false) //遗嘱消息
+	}
+
+	pahoClient := paho.NewClient(pahoOptions)
+	router := newRouter()
+	pahoClient.AddRoute("#", handle(func(message Message) {
+		routes := router.match(&message)
+		for _, route := range routes {
+			m := message
+			m.vars = route.vars(&message)
+			route.handler(m)
+		}
+	}))
+
+	client.client = pahoClient
+	client.Options = options
+	client.router = router
+	client.connHandler = connhandler
+
+	return &client, nil
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) Connect(ctx context.Context) error {
+	// try to connect to the client
+	token := c.client.Connect()
+	return tokenWithContext(ctx, token)
+}
+
+// Connect tries to establish a connection with the mqtt servers
+func (c *Client) IsConnected() bool {
+	// try to connect to the client
+	return c.client.IsConnected()
+}
+
+// DisconnectImmediately will immediately close the connection with the mqtt servers
+func (c *Client) DisconnectImmediately() {
+	c.client.Disconnect(0)
+}
+
+func tokenWithContext(ctx context.Context, token paho.Token) error {
+	completer := make(chan error)
+
+	// TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
+	go func() {
+		token.Wait()
+		completer <- token.Error()
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case err := <-completer:
+			return err
+		}
+	}
+}
+func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
+	c.connHandler.ConnectionLostHandler(err)
+}
+
+func (c *Client) OnConnectHandler(client paho.Client) {
+	c.connHandler.OnConnectHandler()
+}

+ 125 - 0
mqtt/mqttclient.go

@@ -0,0 +1,125 @@
+package mqtt
+
+import (
+	"context"
+	"fmt"
+	"smartIntersection_edge/util/logger"
+	"sync"
+	"time"
+)
+
+type BaseMqttOnline interface {
+	GetOnlineMsg() (string, string)
+	GetWillMsg() (string, string)
+}
+
+type EmptyMqttOnline struct {
+}
+
+func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
+	return "", ""
+}
+func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
+	return "", ""
+}
+
+type MClient struct {
+	mqtt       *Client
+	mu         sync.Mutex     //保护mapTopics
+	mapTopics  map[string]QOS //订阅的主题
+	timeout    uint           //超时时间,毫秒为单位
+	MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
+}
+
+func NewMqttClient(server, clientId, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MClient {
+	o := MClient{
+		mapTopics:  make(map[string]QOS),
+		timeout:    timeout,
+		MqttOnline: mqttOnline,
+	}
+	client, err := NewClient(ClientOptions{
+		Servers:       []string{server},
+		ClientID:      clientId,
+		Username:      user,
+		Password:      password,
+		AutoReconnect: true,
+	}, &o)
+	if err != nil {
+		panic(fmt.Sprintf("MQTT错误: %s", err.Error()))
+		return nil
+	}
+	o.mqtt = client
+	err = client.Connect(o.Ctx())
+	return &o
+}
+
+func (o *MClient) ConnectionLostHandler(err error) {
+	logger.Logger.Errorln("MClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
+}
+
+func (o *MClient) OnConnectHandler() {
+	logger.Logger.Infoln("MClient.OnConnectHandler:MQTT连接成功")
+	//连接成功则订阅主题
+	for k, v := range o.mapTopics {
+		err := o.Subscribe(k, v)
+		if err != nil {
+			return
+		}
+	}
+	topic, str := o.MqttOnline.GetOnlineMsg()
+	if topic != "" {
+		err := o.PublishString(topic, str, 0)
+		if err != nil {
+			return
+		}
+	}
+}
+
+func (o *MClient) GetWill() (topic string, payload string) {
+	return o.MqttOnline.GetWillMsg()
+}
+
+func (o *MClient) Connect() error {
+	return o.mqtt.Connect(o.Ctx())
+}
+
+func (o *MClient) IsConnected() bool {
+	return o.mqtt.IsConnected()
+}
+
+func (o *MClient) Publish(topic string, payload interface{}, qos QOS) error {
+	return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
+}
+func (o *MClient) PublishString(topic string, payload string, qos QOS) error {
+	return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
+}
+func (o *MClient) PublishJSON(topic string, payload interface{}, qos QOS) error {
+	return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
+}
+
+func (o *MClient) Subscribe(topic string, qos QOS) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; !ok {
+		o.mapTopics[topic] = qos
+	}
+	return o.mqtt.Subscribe(o.Ctx(), topic, qos)
+}
+
+func (o *MClient) Unsubscribe(topic string) error {
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	if _, ok := o.mapTopics[topic]; ok {
+		delete(o.mapTopics, topic)
+	}
+	return o.mqtt.Unsubscribe(o.Ctx(), topic)
+}
+
+func (o *MClient) Handle(topic string, handler MessageHandler) Route {
+	return o.mqtt.Handle(topic, handler)
+}
+
+func (o *MClient) Ctx() context.Context {
+	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
+	return ctx
+}

+ 51 - 0
mqtt/mqttmgr.go

@@ -0,0 +1,51 @@
+package mqtt
+
+import (
+	"smartIntersection_edge/util/config"
+	"sync"
+)
+
+var _once sync.Once
+var _mgr *Mgr
+
+func GetMQTTMgr() *Mgr {
+	_once.Do(func() {
+		_mgr = _newMQTTMgr()
+	})
+	return _mgr
+}
+
+type Mgr struct {
+	Cloud *MClient
+}
+
+func _newMQTTMgr() *Mgr {
+	cfg := config.Instance().Mqtt
+	return &Mgr{
+		Cloud: NewMqttClient(cfg.Server,
+			cfg.Id,
+			cfg.User,
+			cfg.Password,
+			3000, &EmptyMqttOnline{}),
+	}
+
+}
+
+func (o *Mgr) Subscribe(topic string, qos QOS, handler MessageHandler) {
+	o.Cloud.Handle(topic, handler)
+	err := o.Cloud.Subscribe(topic, qos)
+	if err != nil {
+		return
+	}
+}
+
+func (o *Mgr) UnSubscribe(topic string) {
+	err := o.Cloud.Unsubscribe(topic)
+	if err != nil {
+		return
+	}
+}
+
+func (o *Mgr) Publish(topic string, payload interface{}, qos QOS) error {
+	return o.Cloud.Publish(topic, payload, qos)
+}

+ 46 - 0
mqtt/publish.go

@@ -0,0 +1,46 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+)
+
+// PublishOption are extra options when publishing a message
+type PublishOption int
+
+const (
+	// Retain tells the broker to retain a message and send it as the first message to new subscribers.
+	Retain PublishOption = iota
+)
+
+// Publish a message with a byte array payload
+func (c *Client) Publish(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, payload, qos, options)
+}
+
+// PublishString publishes a message with a string payload
+func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
+	return c.publish(ctx, topic, []byte(payload), qos, options)
+}
+
+// PublishJSON publishes a message with the payload encoded as JSON using encoding/json
+func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
+	data, err := json.Marshal(payload)
+	if err != nil {
+		return err
+	}
+	return c.publish(ctx, topic, data, qos, options)
+}
+
+func (c *Client) publish(ctx context.Context, topic string, payload interface{}, qos QOS, options []PublishOption) error {
+	var retained = false
+	for _, option := range options {
+		switch option {
+		case Retain:
+			retained = true
+		}
+	}
+
+	token := c.client.Publish(topic, byte(qos), retained, payload)
+	return tokenWithContext(ctx, token)
+}

+ 156 - 0
mqtt/queue.go

@@ -0,0 +1,156 @@
+package mqtt
+
+import (
+	"fmt"
+	"runtime"
+	"sync/atomic"
+)
+
+type mlCache struct {
+	putNo uint32
+	getNo uint32
+	value interface{}
+}
+
+type MlQueue struct {
+	capacity uint32
+	capMod   uint32
+	putPos   uint32
+	getPos   uint32
+	cache    []mlCache
+}
+
+func NewQueue(capacity uint32) *MlQueue {
+	q := new(MlQueue)
+	q.capacity = minQuantity(capacity)
+	q.capMod = q.capacity - 1
+	q.putPos = 0
+	q.getPos = 0
+	q.cache = make([]mlCache, q.capacity)
+	for i := range q.cache {
+		cache := &q.cache[i]
+		cache.getNo = uint32(i)
+		cache.putNo = uint32(i)
+	}
+	cache := &q.cache[0]
+	cache.getNo = q.capacity
+	cache.putNo = q.capacity
+	return q
+}
+
+func (q *MlQueue) String() string {
+	getPos := atomic.LoadUint32(&q.getPos)
+	putPos := atomic.LoadUint32(&q.putPos)
+	return fmt.Sprintf("Queue{capacity: %v, capMod: %v, putPos: %v, getPos: %v}",
+		q.capacity, q.capMod, putPos, getPos)
+}
+
+func (q *MlQueue) Capacity() uint32 {
+	return q.capacity
+}
+
+func (q *MlQueue) Quantity() uint32 {
+	var putPos, getPos uint32
+	var quantity uint32
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		quantity = putPos - getPos
+	} else {
+		quantity = q.capMod + (putPos - getPos)
+	}
+
+	return quantity
+}
+
+func (q *MlQueue) Put(val interface{}) (ok bool, quantity uint32) {
+	var putPos, putPosNew, getPos, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	getPos = atomic.LoadUint32(&q.getPos)
+	putPos = atomic.LoadUint32(&q.putPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt >= capMod-1 {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	putPosNew = putPos + 1
+	if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
+		runtime.Gosched()
+		return false, posCnt
+	}
+
+	cache = &q.cache[putPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if putPosNew == putNo && getNo == putNo {
+			cache.value = val
+			atomic.AddUint32(&cache.putNo, q.capacity)
+			return true, posCnt + 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func (q *MlQueue) Get() (val interface{}, ok bool, quantity uint32) {
+	var putPos, getPos, getPosNew, posCnt uint32
+	var cache *mlCache
+	capMod := q.capMod
+
+	putPos = atomic.LoadUint32(&q.putPos)
+	getPos = atomic.LoadUint32(&q.getPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt < 1 {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	getPosNew = getPos + 1
+	if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
+		runtime.Gosched()
+		return nil, false, posCnt
+	}
+
+	cache = &q.cache[getPosNew&capMod]
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if getPosNew == getNo && getNo == putNo-q.capacity {
+			val = cache.value
+			atomic.AddUint32(&cache.getNo, q.capacity)
+			return val, true, posCnt - 1
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+func minQuantity(v uint32) uint32 {
+	v--
+	v |= v >> 1
+	v |= v >> 2
+	v |= v >> 4
+	v |= v >> 8
+	v |= v >> 16
+	v++
+	return v
+}

+ 124 - 0
mqtt/router.go

@@ -0,0 +1,124 @@
+package mqtt
+
+import (
+	"github.com/google/uuid"
+	"strings"
+	"sync"
+)
+
+type router struct {
+	routes []Route
+	lock   sync.RWMutex
+}
+
+func newRouter() *router {
+	return &router{routes: []Route{}, lock: sync.RWMutex{}}
+}
+
+// Route is a receipt for listening or handling certain topic
+type Route struct {
+	router  *router
+	id      string
+	topic   string
+	handler MessageHandler
+}
+
+func newRoute(router *router, topic string, handler MessageHandler) Route {
+	return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler}
+}
+
+func match(route []string, topic []string) bool {
+	if len(route) == 0 {
+		return len(topic) == 0
+	}
+
+	if len(topic) == 0 {
+		return route[0] == "#"
+	}
+
+	if route[0] == "#" {
+		return true
+	}
+
+	if (route[0] == "+") || (route[0] == topic[0]) {
+		return match(route[1:], topic[1:])
+	}
+	return false
+}
+
+func routeIncludesTopic(route, topic string) bool {
+	return match(routeSplit(route), strings.Split(topic, "/"))
+}
+
+func routeSplit(route string) []string {
+	var result []string
+	if strings.HasPrefix(route, "$share") {
+		result = strings.Split(route, "/")[2:]
+	} else {
+		result = strings.Split(route, "/")
+	}
+	return result
+}
+
+func (r *Route) match(message *Message) bool {
+	return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic())
+}
+
+func (r *Route) vars(message *Message) []string {
+	var vars []string
+	route := routeSplit(r.topic)
+	topic := strings.Split(message.Topic(), "/")
+
+	for i, section := range route {
+		if section == "+" {
+			if len(topic) > i {
+				vars = append(vars, topic[i])
+			}
+		} else if section == "#" {
+			if len(topic) > i {
+				vars = append(vars, topic[i:]...)
+			}
+		}
+	}
+
+	return vars
+}
+
+func (r *router) addRoute(topic string, handler MessageHandler) Route {
+	if handler != nil {
+		route := newRoute(r, topic, handler)
+		r.lock.Lock()
+		r.routes = append(r.routes, route)
+		r.lock.Unlock()
+		return route
+	}
+	return Route{router: r}
+}
+
+func (r *router) removeRoute(removeRoute *Route) {
+	r.lock.Lock()
+	for i, route := range r.routes {
+		if route.id == removeRoute.id {
+			r.routes[i] = r.routes[len(r.routes)-1]
+			r.routes = r.routes[:len(r.routes)-1]
+		}
+	}
+	r.lock.Unlock()
+}
+
+func (r *router) match(message *Message) []Route {
+	routes := []Route{}
+	r.lock.RLock()
+	for _, route := range r.routes {
+		if route.match(message) {
+			routes = append(routes, route)
+		}
+	}
+	r.lock.RUnlock()
+	return routes
+}
+
+// Stop removes this route from the router and stops matching it
+func (r *Route) Stop() {
+	r.router.removeRoute(r)
+}

+ 99 - 0
mqtt/subscribe.go

@@ -0,0 +1,99 @@
+package mqtt
+
+import (
+	"context"
+	"encoding/json"
+
+	paho "github.com/eclipse/paho.mqtt.golang"
+)
+
+// A Message from or to the broker
+type Message struct {
+	message paho.Message
+	vars    []string
+}
+
+// A MessageHandler to handle incoming messages
+type MessageHandler func(Message)
+
+// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
+// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
+func (m *Message) TopicVars() []string {
+	return m.vars
+}
+
+// Topic is the topic the message was recieved on
+func (m *Message) Topic() string {
+	return m.message.Topic()
+}
+
+// QOS is the quality of service the message was recieved with
+func (m *Message) QOS() QOS {
+	return QOS(m.message.Qos())
+}
+
+// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
+func (m *Message) IsDuplicate() bool {
+	return m.message.Duplicate()
+}
+
+// Acknowledge explicitly acknowledges to a broker that the message has been recieved
+func (m *Message) Acknowledge() {
+	m.message.Ack()
+}
+
+// Payload returns the payload as a byte array
+func (m *Message) Payload() []byte {
+	return m.message.Payload()
+}
+
+// PayloadString returns the payload as a string
+func (m *Message) PayloadString() string {
+	return string(m.message.Payload())
+}
+
+// PayloadJSON unmarshal the payload into the provided interface using encoding/json and returns an error if anything fails
+func (m *Message) PayloadJSON(v interface{}) error {
+	return json.Unmarshal(m.message.Payload(), v)
+}
+
+// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
+// Also returns a route that can be used to unsubscribe. Does not automatically subscribe.
+func (c *Client) Handle(topic string, handler MessageHandler) Route {
+	return c.router.addRoute(topic, handler)
+}
+
+// Listen returns a stream of messages that match the topic.
+// Also returns a route that can be used to unsubscribe. Does not automatically subscribe.
+func (c *Client) Listen(topic string) (chan Message, Route) {
+	queue := make(chan Message)
+	route := c.router.addRoute(topic, func(message Message) {
+		queue <- message
+	})
+	return queue, route
+}
+
+// Subscribe subscribes to a certain topic and errors if this fails.
+func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
+	token := c.client.Subscribe(topic, byte(qos), nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// SubscribeMultiple subscribes to multiple topics and errors if this fails.
+func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
+	subs := make(map[string]byte, len(subscriptions))
+	for topic, qos := range subscriptions {
+		subs[topic] = byte(qos)
+	}
+	token := c.client.SubscribeMultiple(subs, nil)
+	err := tokenWithContext(ctx, token)
+	return err
+}
+
+// Unsubscribe unsubscribes from a certain topic and errors if this fails.
+func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
+	token := c.client.Unsubscribe(topic)
+	err := tokenWithContext(ctx, token)
+	return err
+}

+ 137 - 0
service/deviceMgr.go

@@ -0,0 +1,137 @@
+package service
+
+import (
+	"fmt"
+	"net"
+	"regexp"
+	"smartIntersection_edge/util/config"
+	"smartIntersection_edge/util/logger"
+	"strconv"
+	"strings"
+	"time"
+)
+
+var (
+	Devices = make(map[string]*Device)
+)
+
+type Device struct {
+	Info         *config.Screens
+	Conn         net.Conn
+	IsLogin      bool
+	LastTime     time.Time
+	SumHighSpeed int
+	SumLowSpeed  int
+}
+
+// 实例所有设备
+func InitDevices() {
+	//// 将数据库中的设备全部离线
+	//screens, _ := dao.QueryAllScreens()
+	//for _, screen := range screens {
+	//	Devices[screen.Sn] = &Device{Info: screen}
+	//}
+}
+
+func (s *Device) Start(conn net.Conn) {
+	s.Conn = conn
+	go s.Process()
+}
+
+func (s *Device) Process() {
+	// 函数执行完之后关闭连接
+	defer s.Conn.Close()
+	for {
+		buf := make([]byte, 256)
+		// 将tcp连接读取到的数据读取到byte数组中, 返回读取到的byte的数目
+		n, err := s.Conn.Read(buf)
+		if err != nil {
+			// 从客户端读取数据的过程中发生错误 这里如果没读到可以视为设备离线了
+			logger.Logger.Errorf("read err,dev offLine")
+			break
+		}
+		data := string(buf[:n])
+		fmt.Println("读取", string(buf[:n]), time.Now())
+		if data[2:7] == "login" {
+			s.Conn.Write([]byte("login:successful"))
+		}
+
+		if data[2:11] == "heartbeat" { //默认一分钟发一次心跳
+			s.LastTime = time.Now()
+			if !s.IsLogin {
+				s.Conn.Write([]byte("{'trans':'on'}")) //开启上传状态
+				screens := FindScreenBySN(data[20:44])
+				s.Info = screens
+				s.IsLogin = true
+				Devices[data[20:44]] = s
+				topic := MqttService.GetTopic(s.Info.Sn, TopicChanStatus)
+				err := MqttService.Publish(topic, 1)
+				if err != nil {
+					continue
+				}
+			}
+			Devices[data[20:44]].Conn = s.Conn
+			logger.Logger.Infof("%v 设备心跳", s.Info.ScreensName)
+		}
+
+		//判断超速
+		if strings.Contains(data, `"status":"highspeed"`) {
+			s.SumLowSpeed = 0
+			re := regexp.MustCompile(`"speed1":"(\d+)"`)
+			match := re.FindStringSubmatch(data)
+			if len(match) > 1 {
+				speed1, _ := strconv.Atoi(match[1])
+				if speed1 > 50 { //大于50认为超速
+					s.SumHighSpeed++
+					if s.SumHighSpeed >= 3 { //连续三次认为超速
+						topic := MqttService.GetTopic(s.Info.Sn, TopicHighSpeed)
+						MqttService.Publish(topic, time.Now()) //上报当前超速时间
+					}
+				} else {
+					s.SumHighSpeed = 0
+				}
+			} else {
+				fmt.Println("Speed1 not found")
+				continue
+			}
+		}
+
+		//判断低速
+		if strings.Contains(data, `"status":"normalspeed"`) {
+			s.SumHighSpeed = 0
+			re := regexp.MustCompile(`"speed1":"(\d+)"`)
+			match := re.FindStringSubmatch(data)
+			if len(match) > 1 {
+				speed1, _ := strconv.Atoi(match[1])
+				if speed1 < 10 { //小于10认为低速
+					s.SumLowSpeed++
+
+					if s.SumLowSpeed >= 6 { //连续8次认为低速
+						topic := MqttService.GetTopic(s.Info.Sn, TopicLowSpeed)
+						MqttService.Publish(topic, time.Now()) //上报当前低俗时间
+					}
+				} else {
+					s.SumLowSpeed = 0
+				}
+			} else {
+				fmt.Println("Speed1 not found")
+				continue
+			}
+		}
+
+		//if strings.Contains(data, `"status":"none"`) {
+		//	s.SumHighSpeed = 0
+		//	s.SumLowSpeed = 0
+		//}
+
+	}
+}
+
+func FindScreenBySN(sn string) *config.Screens {
+	for _, screen := range config.DevConfig.Screens {
+		if screen != nil && screen.Sn == sn {
+			return screen
+		}
+	}
+	return nil
+}

+ 1 - 0
service/model.go

@@ -0,0 +1 @@
+package service

+ 114 - 0
service/mqtt_handle.go

@@ -0,0 +1,114 @@
+package service
+
+import (
+	"errors"
+	"fmt"
+	"runtime"
+	"runtime/debug"
+	"smartIntersection_edge/mqtt"
+	"smartIntersection_edge/util/logger"
+	"strings"
+	"sync"
+	"time"
+)
+
+func InitMqtt() {
+	MqttService = GetHandler()
+	MqttService.SubscribeTopics()
+	go MqttService.Handler()
+}
+
+var MqttService *MqttHandler
+
+type MqttHandler struct {
+	queue *mqtt.MlQueue
+}
+
+var _handlerOnce sync.Once
+var _handlerSingle *MqttHandler
+
+func GetHandler() *MqttHandler {
+	_handlerOnce.Do(func() {
+		_handlerSingle = &MqttHandler{
+			queue: mqtt.NewQueue(10000),
+		}
+	})
+	return _handlerSingle
+}
+
+func (o *MqttHandler) SubscribeTopics() {
+	mqtt.GetMQTTMgr().Subscribe("smart_intersectionV2.0/#", mqtt.AtLeastOnce, o.HandlerData)
+}
+
+func (o *MqttHandler) HandlerData(m mqtt.Message) {
+	for {
+		ok, cnt := o.queue.Put(&m)
+		if ok {
+			break
+		} else {
+			logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
+			runtime.Gosched()
+		}
+	}
+}
+
+func (o *MqttHandler) Handler() interface{} {
+	defer func() {
+		if err := recover(); err != nil {
+			go GetHandler().Handler()
+			logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
+		}
+	}()
+	for {
+		msg, ok, quantity := o.queue.Get()
+		if !ok {
+			time.Sleep(10 * time.Millisecond)
+			continue
+		} else if quantity > 1000 {
+			logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
+		}
+		m, ok := msg.(*mqtt.Message)
+		if !ok {
+			continue
+		}
+		sn, topic, err := ParseTopic(m.Topic())
+		if err != nil {
+			logger.Logger.Errorf("parseTopic err")
+			continue
+		}
+
+		switch topic {
+		case TopicSetControl:
+			n, err := Devices[sn].Conn.Write([]byte(m.PayloadString()))
+			if n <= 0 && err != nil {
+				logger.Logger.Errorf("设备:%v操作失败,错误:%v", sn, err)
+			}
+		}
+	}
+}
+
+func (o *MqttHandler) Publish(topic string, data interface{}) error {
+	return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
+}
+
+func (o *MqttHandler) GetTopic(sn, operation string) string {
+	return fmt.Sprintf("smart_intersectionV2.0/%s/%s", sn, operation)
+}
+
+func ParseTopic(topic string) (sn string, operate string, err error) {
+	strList := strings.Split(topic, "/")
+	sn = strList[1]
+	operate = strList[2]
+	if len(strList) != 3 {
+		return "", "", errors.New("不支持的topic")
+	}
+	return
+}
+
+const (
+	TopicChanStatus = "chanStatus" //上报状态
+	TopicHighSpeed  = "highSpeed"  //超速时
+	TopicLowSpeed   = "lowSpeed"   //低速时
+
+	TopicSetControl = "setControl" //云台下发控制
+)

+ 98 - 0
service/tcp.go

@@ -0,0 +1,98 @@
+package service
+
+import (
+	"errors"
+	"net"
+	"strings"
+	"sync"
+	"time"
+)
+
+func ListenTcp() {
+	var listen net.Listener
+	var err error
+	// 监听当前的tcp连接
+	for {
+		listen, err = net.Listen("tcp", "0.0.0.0:9200")
+		if err != nil {
+			time.Sleep(5 * time.Second) // 休眠一段时间后重试
+			continue
+		}
+		break // 成功监听后退出循环
+	}
+
+	tracker := NewConnectionTracker() //创建连接检测器
+
+	for {
+		conn, err := listen.Accept()
+		if err != nil {
+			continue
+		}
+		err = CheckConn(conn, tracker)
+		if err != nil {
+			conn.Close() // 如果是恶意连接,则关闭连接
+			continue
+		}
+	}
+}
+
+func CheckConn(conn net.Conn, tracker *ConnectionTracker) error {
+	arr := strings.Split(conn.RemoteAddr().String(), ":")
+	ip := arr[0]
+	// 记录连接
+	tracker.recordConnection(ip)
+
+	// 检查是否为恶意连接
+	if tracker.isMalicious(ip) {
+		return errors.New("connection is Malicious")
+	}
+	device := Device{}
+	device.Start(conn)
+	return nil
+}
+
+type ConnectionTracker struct {
+	mu          sync.Mutex
+	connections map[string][]time.Time // 存储每个 IP 的连接时间戳
+}
+
+func NewConnectionTracker() *ConnectionTracker {
+	return &ConnectionTracker{
+		connections: make(map[string][]time.Time),
+	}
+}
+
+func (ct *ConnectionTracker) recordConnection(ip string) {
+	ct.mu.Lock()
+	defer ct.mu.Unlock()
+
+	now := time.Now()
+	ct.connections[ip] = append(ct.connections[ip], now)
+
+	// 清理过期的连接记录
+	ct.cleanUpExpired(ip, now)
+}
+
+func (ct *ConnectionTracker) cleanUpExpired(ip string, now time.Time) {
+	threshold := now.Add(-3 * time.Minute)
+	if timestamps, exists := ct.connections[ip]; exists {
+		var filtered []time.Time
+		for _, t := range timestamps {
+			if t.After(threshold) { // 检查时间戳是否在三分钟内
+				filtered = append(filtered, t) // 如果在范围内,保存到 filtered 列表
+			}
+		}
+		ct.connections[ip] = filtered
+	}
+}
+
+// 判断是否是恶意连接
+func (ct *ConnectionTracker) isMalicious(ip string) bool {
+	ct.mu.Lock()
+	defer ct.mu.Unlock()
+
+	if timestamps, exists := ct.connections[ip]; exists {
+		return len(timestamps) >= 10 // 定义恶意连接的阈值
+	}
+	return false
+}

+ 29 - 0
timer/task.go

@@ -0,0 +1,29 @@
+package timer
+
+import (
+	"smartIntersection_edge/service"
+	"smartIntersection_edge/util/logger"
+	"time"
+)
+
+func IsOnline() {
+	t := time.NewTicker(1 * time.Minute) //每分钟
+	for {
+		select {
+		case <-t.C:
+			for _, device := range service.Devices {
+				//符合条件
+				if time.Now().Add(-2*time.Minute).After(device.LastTime) || device.LastTime.IsZero() {
+					//TODO 向云端上报设备离线
+					topic := service.MqttService.GetTopic(device.Info.Sn, service.TopicChanStatus)
+					err := service.MqttService.Publish(topic, 0)
+					delete(service.Devices, device.Info.Sn)
+					if err != nil {
+						continue
+					}
+					logger.Logger.Infof("%v 设备离线了", device.Info.ScreensName)
+				}
+			}
+		}
+	}
+}

+ 90 - 0
util/config/config.go

@@ -0,0 +1,90 @@
+package config
+
+import (
+	"encoding/json"
+	"fmt"
+	"gopkg.in/yaml.v3"
+	"io/ioutil"
+	"os"
+	"sync"
+)
+
+var (
+	instance  *config
+	once      sync.Once
+	DevConfig Devs
+)
+
+// 初始化配置,确保只初始化一次
+func init() {
+	once.Do(func() {
+		loadConfig() // 加载配置文件
+	})
+}
+
+// 重新加载配置文件
+func ReloadConfig() {
+	loadConfig() // 加载配置文件
+}
+
+// 加载配置文件
+func loadConfig() {
+	var conf config
+	filePath := "./config/config.yaml"
+	if f, err := os.Open(filePath); err != nil {
+		panic(err)
+	} else {
+		err := yaml.NewDecoder(f).Decode(&conf)
+		if err != nil {
+			panic(err)
+		}
+	}
+	instance = &conf
+}
+
+// 获取配置文档实例
+func Instance() *config {
+	return instance
+}
+
+type config struct {
+	ProjectName string `yaml:"projectName"`
+	Logger      logger `yaml:"logger"`
+	Mqtt        mqtt   `yaml:"mqtt"`
+}
+
+type logger struct {
+	Path  string `yaml:"path"`
+	Name  string `yaml:"name"`
+	Level string `yaml:"level"`
+}
+
+type mqtt struct {
+	Server   string `yaml:"server"`
+	Id       string `yaml:"id"`
+	User     string `yaml:"user"`
+	Password string `yaml:"password"`
+}
+
+func LoadDevConfig() error {
+	// 读取配置文件
+	data, err := ioutil.ReadFile("./dev/devs.json")
+	if err != nil {
+		return fmt.Errorf("读取配置文件失败: %v", err)
+	}
+
+	// 解析 JSON 数据到结构体
+	if err := json.Unmarshal(data, &DevConfig); err != nil {
+		return fmt.Errorf("解析 JSON 配置失败: %v", err)
+	}
+	return nil
+}
+
+type Devs struct {
+	Screens []*Screens `json:"ledScreens"`
+}
+
+type Screens struct {
+	Sn          string `json:"sn"`
+	ScreensName string `json:"screensName"`
+}

+ 42 - 0
util/logger/lclog.go

@@ -0,0 +1,42 @@
+package logger
+
+import (
+	"github.com/druidcaesa/gotool"
+	rotatelogs "github.com/lestrrat/go-file-rotatelogs"
+	"github.com/sirupsen/logrus"
+	"os"
+	"path"
+	"smartIntersection_edge/util/config"
+	"time"
+)
+
+var Logger *logrus.Logger
+
+func InitLog() {
+	logFilePath := config.Instance().Logger.Path
+	logFileName := config.Instance().Logger.Name
+
+	err := os.MkdirAll(logFilePath, os.ModeDir)
+	if err != nil {
+		gotool.Logs.ErrorLog().Println(err)
+		panic(err)
+	}
+
+	// 日志文件
+	fileName := path.Join(logFilePath, logFileName)
+	writer, _ := rotatelogs.New(
+		fileName+".%Y%m%d.log",
+		rotatelogs.WithMaxAge(7*24*time.Hour),     // 文件最大保存时间 7天
+		rotatelogs.WithRotationTime(24*time.Hour), // 日志切割时间间隔
+	)
+	// 实例化
+	logger := logrus.New()
+
+	logger.SetFormatter(&logrus.JSONFormatter{
+		TimestampFormat: "2006-01-02 15:04:05.000",
+	})
+	// 设置日志级别
+	logger.SetLevel(logrus.DebugLevel)
+	logger.SetOutput(writer)
+	Logger = logger
+}