package initialize import ( "encoding/hex" "fmt" "log" "net" "runtime" "runtime/debug" "server/dao" "server/modbus" "server/model" "server/service" "server/utils" "strconv" "sync" "time" ) type ModbusHandler struct { queue *modbus.MlQueue } var _handlerOnce sync.Once var _handlerSingle *ModbusHandler func GetHandler() *ModbusHandler { _handlerOnce.Do(func() { _handlerSingle = &ModbusHandler{ queue: modbus.NewQueue(10000), } }) return _handlerSingle } func InitInductanceTCP() { lis, err := net.Listen("tcp", ":60001") if err != nil { fmt.Println(err) } else { fmt.Println(lis) model.InductanceTCP = lis fmt.Println("inductanceTCP启动成功") } go StartInductanceTCP() } func StartInductanceTCP() { fmt.Println("进入StartInductanceTCP") handler := GetHandler() for { log.Println("进入InductanceTCP循环") conn, err := model.InductanceTCP.Accept() if err != nil { log.Println(err) } remoteAddr := conn.RemoteAddr().String() log.Printf("lis Accept conn = %s\n", remoteAddr) model.Mutex.Lock() if _, exists := model.ConnectionMap[remoteAddr]; exists { log.Printf("Connection from %s already exists, closing new connection\n", remoteAddr) conn.Close() model.Mutex.Unlock() continue } deviceId, err := GetDeviceId(conn) if err != nil { log.Println("Error getting device ID:", err) conn.Close() model.Mutex.Unlock() continue } model.ConnectionMap[deviceId] = conn model.Mutex.Unlock() //用新的协程处理新的连接 go handler.ReadAndHandle(conn, deviceId) go handler.Handler() } } func GetDeviceId(conn net.Conn) (id string, err error) { // 发送 Modbus RTU 帧 //FE 04 03 EE 00 08 85 B2 data, err := utils.WriteAndReadDevice(modbus.ReadDeviceId, conn, 9, 2) if err != nil { return } return string(data), err } func (o *ModbusHandler) ReadAndHandle(conn net.Conn, deviceId string) { for { buffer := make([]byte, 1024) n, err := conn.Read(buffer) if err != nil { log.Println("Error reading from connection:", err) return } queueData := model.QueueData{ Id: deviceId, Value: buffer[:n], } ok, cnt := o.queue.Put(&queueData) if ok { continue } else { fmt.Printf("HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *ModbusHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() fmt.Printf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { fmt.Printf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } queueData, ok := msg.(*model.QueueData) if !ok { fmt.Println("Type assertion failed: msg is not of type model.QueueDat") return nil } // 信息处理返回 parseData(queueData) // 对数据进行修改 } } func parseData(data *model.QueueData) { reg, dev, err := utils.GetDataByDeviceId(data.Id) if err != nil { fmt.Println("Error getting register and device:", err) return } toString := hex.EncodeToString(data.Value) switch toString[0:2] { case "fe": switch toString[4:8] { // 开关灯 case "0000", "0001", "0002", "0003", "0004", "0005", "0006", "0007": if toString[8:12] == "0000" { for i, loop := range dev.DeviceLoops { loop.State = 0 dev.DeviceLoops[i] = loop } } else if toString[8:12] == "ff00" { for i, loop := range dev.DeviceLoops { loop.State = 1 dev.DeviceLoops[i] = loop } } } case "11": switch toString[2:6] { case "0336": batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64) batteryCurrent, _ := strconv.ParseInt(toString[10:14], 16, 64) batteryPlateVoltage, _ := strconv.ParseInt(toString[38:42], 16, 64) sun := dao.Sun{ DeviceId: dev.Sn, BatteryVoltage: float64(batteryVoltage) / 100, BatteryCurrent: int(batteryCurrent), BatteryPlateVoltage: float64(batteryPlateVoltage) / 100, } for i, device := range reg.Devices { if device.Sn == dev.Sn { reg.Devices[i].Sun = sun } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { fmt.Println(err) return } //电池 if float64(batteryVoltage)/100 < 5 { data1 := modbus.DeviceSwitch(8, 1) utils.WriteDevice(data1, model.ConnectionMap[data.Id]) } else { data1 := modbus.DeviceSwitch(8, 0) utils.WriteDevice(data1, model.ConnectionMap[data.Id]) } } } switch toString[0:4] { case "4c43": bytes, err := hex.DecodeString(toString[4:]) if err != nil { fmt.Println("Error decoding bytes:", err) return } fmt.Println(string(bytes) + time.Now().String()) for i, device := range reg.Devices { if device.Sn == string(bytes) { reg.Devices[i].State = 1 reg.Devices[i].OnlineTime = time.Now() } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { fmt.Println(err) return } } }