package initialize import ( "encoding/hex" "net" "os" "runtime" "runtime/debug" "server/dao" "server/logger" "server/modbus" "server/model" "server/service" "server/utils" "strconv" "strings" "sync" "syscall" "time" ) type ModbusHandler struct { queue *modbus.MlQueue } var _handlerOnce sync.Once var _handlerSingle *ModbusHandler func GetHandler() *ModbusHandler { _handlerOnce.Do(func() { _handlerSingle = &ModbusHandler{ queue: modbus.NewQueue(10000), } }) return _handlerSingle } func InitInductanceTCP() { lis, err := net.Listen("tcp", ":60001") if err != nil { logger.Get().Fatalln("Failed to listen on port 60001:", err) } else { logger.Get().Println("inductanceTCP启动成功") model.InductanceTCP = lis } go StartInductanceTCP() } func StartInductanceTCP() { logger.Get().Println("进入StartInductanceTCP") handler := GetHandler() for { logger.Get().Println("进入InductanceTCP循环") conn, err := model.InductanceTCP.Accept() if err != nil { logger.Get().Errorf("Accept error: %v", err) // 根据实际情况决定是否继续循环 continue } remoteAddr := conn.RemoteAddr().String() logger.Get().Printf("lis Accept conn = %s\n", remoteAddr) deviceId, err := GetDeviceId(conn) if err != nil { logger.Get().Errorf("Error getting device ID from %s: %v", remoteAddr, err) conn.Close() continue } model.Mutex.Lock() if existingConn, exists := model.ConnectionMap[deviceId]; exists { logger.Get().Printf("Connection from %s already exists for deviceId %d, closing new connection\n", remoteAddr, deviceId) existingConn.Close() // 关闭旧连接 delete(model.ConnectionMap, deviceId) // 清理旧连接 } model.ConnectionMap[deviceId] = conn model.Mutex.Unlock() // 使用新的协程处理新的连接 go handler.ReadAndHandle(conn, deviceId) go handler.Handler() } } func GetDeviceId(conn net.Conn) (id string, err error) { // 发送 Modbus RTU 帧 //FE 04 03 EE 00 08 85 B2 data, err := utils.WriteAndReadDevice(modbus.ReadDeviceId, conn, 9, 2) if err != nil { return } return string(data), err } func (o *ModbusHandler) ReadAndHandle(conn net.Conn, deviceId string) { for { buffer := make([]byte, 1024) n, err := conn.Read(buffer) if err != nil { if isConnReset(err) { logger.Get().Error("连接被远程主机强制关闭") } else if os.IsTimeout(err) { logger.Get().Error("读取操作超时") } else { // 处理其他类型的错误 logger.Get().Errorf("读取错误: %s\n", err) } return } queueData := model.QueueData{ Id: deviceId, Value: buffer[:n], } ok, cnt := o.queue.Put(&queueData) if ok { continue } else { logger.Get().Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func isConnReset(err error) bool { if opErr, ok := err.(*net.OpError); ok { if opErr.Err == syscall.ECONNRESET { return true // Unix-like 系统上的 ECONNRESET } else if runtime.GOOS == "windows" { // Windows 上的 WSAECONNRESET 通常是通过错误消息识别的 if se, ok := opErr.Err.(*os.SyscallError); ok { if errno, ok := se.Err.(syscall.Errno); ok { if errno == 10054 { // 10054 对应 WSAECONNRESET return true } } } else if strings.Contains(opErr.Err.Error(), "WSAECONNRESET") { // 如果错误消息包含 WSAECONNRESET,也认为是连接被重置 return true } } } return false } func (o *ModbusHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() logger.Get().Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logger.Get().Errorf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } queueData, ok := msg.(*model.QueueData) if !ok { logger.Get().Errorln("Type assertion failed: msg is not of type model.QueueDat") return nil } // 信息处理返回 parseData(queueData) // 对数据进行修改 } } func parseData(data *model.QueueData) { reg, dev, err := utils.GetDataByDeviceId(data.Id) if err != nil { logger.Get().Errorln("Error getting register and device:", err) return } toString := hex.EncodeToString(data.Value) switch toString[0:2] { case "fe": switch toString[4:8] { // 开关灯 case "0000", "0001", "0002", "0003", "0004", "0005", "0006", "0007": if toString[8:12] == "0000" { for i, loop := range dev.DeviceLoops { loop.State = 0 dev.DeviceLoops[i] = loop } } else if toString[8:12] == "ff00" { for i, loop := range dev.DeviceLoops { loop.State = 1 dev.DeviceLoops[i] = loop } } } switch toString[2:6] { case "0101": for i, device := range reg.Devices { if device.Sn == data.Id { if reg.Devices[i].State != 1 { reg.Devices[i].State = 1 service.Cron{}.RelayOnOffTimeTaskSn(data.Id) } reg.Devices[i].OnlineTime = time.Now() for i2, _ := range device.DeviceLoops { if toString[6:8] == "ff" || toString[6:8] == "0f" { device.DeviceLoops[i2].State = 1 } else if toString[6:8] == "00" { device.DeviceLoops[i2].State = 0 } } } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("设备回路状态" + err.Error()) return } } case "01": switch toString[2:6] { case "0336": logger.Get().Println(data.Id + "----" + toString) batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64) batteryCurrent, _ := strconv.ParseInt(toString[10:14], 16, 64) batteryPlateVoltage, _ := strconv.ParseInt(toString[38:42], 16, 64) sun := dao.Sun{ DeviceId: dev.Sn, BatteryVoltage: float64(batteryVoltage) / 100, BatteryCurrent: int(batteryCurrent), BatteryPlateVoltage: float64(batteryPlateVoltage) / 100, } for i, device := range reg.Devices { if device.Sn == dev.Sn { reg.Devices[i].Sun = sun } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Println("太阳能数据保存" + err.Error()) return } //电池 //if float64(batteryVoltage)/100 < 5 { // data1 := modbus.DeviceSwitch(8, 1) // utils.WriteDevice(data1, model.ConnectionMap[data.Id]) //} else { // data1 := modbus.DeviceSwitch(8, 0) // utils.WriteDevice(data1, model.ConnectionMap[data.Id]) //} } } switch toString[0:4] { case "4c43": bytes, err := hex.DecodeString(toString[4:]) if err != nil { logger.Get().Errorln("Error decoding bytes:", err) return } for i, device := range reg.Devices { if device.Sn == string(bytes) { reg.Devices[i].State = 1 reg.Devices[i].OnlineTime = time.Now() } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("心跳" + err.Error()) return } } }