package initialize import ( "encoding/hex" "fmt" "io" "net" "os" "runtime" "runtime/debug" "server/dao" "server/logger" "server/modbus" "server/model" "server/service" "server/utils" "strconv" "strings" "sync" "syscall" "time" ) type ModbusHandler struct { queue *modbus.MlQueue } var _handlerOnce sync.Once var _handlerSingle *ModbusHandler func GetHandler() *ModbusHandler { _handlerOnce.Do(func() { _handlerSingle = &ModbusHandler{ queue: modbus.NewQueue(10000), } }) return _handlerSingle } func InitInductanceTCP() { lis, err := net.Listen("tcp", ":60001") if err != nil { logger.Get().Println(err) } else { model.InductanceTCP = lis logger.Get().Printf("inductanceTCP启动成功 %v \n", lis) } go StartInductanceTCP() } func StartInductanceTCP() { logger.Get().Println("进入StartInductanceTCP") handler := GetHandler() for { logger.Get().Println("进入InductanceTCP循环") conn, err := model.InductanceTCP.Accept() if err != nil { logger.Get().Println(err) } remoteAddr := conn.RemoteAddr().String() logger.Get().Printf("lis Accept conn = %s\n", remoteAddr) // 解析远程地址 addr, err := net.ResolveTCPAddr("tcp", remoteAddr) if err != nil { // 处理错误... logger.Get().Errorf("解析错误 conn = %s\n", addr.IP.String()) } model.ConnectionMap1.Store(addr.IP.String(), conn) //每次连接 进行重连操作 service.DeviceAdjustment(addr.IP.String()) // 使用 Load 方法尝试获取连接 if conn1, ok := model.ConnectionMap1.Load(addr.IP.String()); ok { // 成功找到连接 netConn := conn1.(net.Conn) // 在这里处理 netConn go handler.ReadAndHandle(netConn, addr.IP.String()) go handler.Handler() } else { // 没有找到对应的连接 logger.Get().Printf("启动 Connection for key %s not found", addr.IP.String()) } } } func (o *ModbusHandler) ReadAndHandle(conn net.Conn, remoteAddr string) { defer conn.Close() for { buffer := make([]byte, 1024) n, err := conn.Read(buffer) if err != nil && err != io.EOF { if isConnReset(err) { logger.Get().Errorf("连接被远程主机强制关闭conn: %s", conn.RemoteAddr().String()) } else if os.IsTimeout(err) { logger.Get().Errorf("读取操作超时conn: %s", conn.RemoteAddr().String()) } else { // 处理其他类型的错误 logger.Get().Errorf("读取错误: %s\n conn: %s", err, conn.RemoteAddr().String()) } model.ConnectionMap1.Delete(conn.RemoteAddr().String()) return } queueData := model.QueueData{ Ip: remoteAddr, Value: buffer[:n], } ok, cnt := o.queue.Put(&queueData) if ok { continue } else { logger.Get().Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func isConnReset(err error) bool { if opErr, ok := err.(*net.OpError); ok { if opErr.Err == syscall.ECONNRESET { return true // Unix-like 系统上的 ECONNRESET } else if runtime.GOOS == "windows" { // Windows 上的 WSAECONNRESET 通常是通过错误消息识别的 if se, ok := opErr.Err.(*os.SyscallError); ok { if errno, ok := se.Err.(syscall.Errno); ok { if errno == 10054 { // 10054 对应 WSAECONNRESET return true } } } else if strings.Contains(opErr.Err.Error(), "WSAECONNRESET") { // 如果错误消息包含 WSAECONNRESET,也认为是连接被重置 return true } } } return false } func (o *ModbusHandler) Handler() interface{} { defer func() { if err := recover(); err != nil { go GetHandler().Handler() logger.Get().Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logger.Get().Errorf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } queueData, ok := msg.(*model.QueueData) if !ok { logger.Get().Errorln("Type assertion failed: msg is not of type model.QueueDat") return nil } // 信息处理返回 parseData(queueData) // 对数据进行修改 } } func parseData(data *model.QueueData) { reg, dev, err := utils.GetDataByDeviceIP(data.Ip) if err != nil { logger.Get().Errorln("Error getting register and device:", err) return } toString := hex.EncodeToString(data.Value) logger.Get().Infof("deviceIP: %s, value: %s", data.Ip, toString) switch toString[0:2] { case "fe": switch toString[2:8] { // 开关灯 case "050000", "050001", "050002", "050003", "050004", "050005", "050006", "050007": relyId, _ := strconv.Atoi(toString[7:8]) logger.Get().Error(dev.DeviceLoops[relyId]) if toString[8:12] == "0000" { dev.DeviceLoops[relyId].State = 0 } else if toString[8:12] == "ff00" { dev.DeviceLoops[relyId].State = 1 } for i, device := range reg.Devices { if device.Ip == data.Ip { reg.Devices[i] = dev } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("设备回路状态" + err.Error()) return } case "0f0000": OperationCommand := hex.EncodeToString(modbus.OperationCommand) logger.Get().Infof("deviceId: %s, value: %s", data.Ip, OperationCommand) if OperationCommand[14:16] == "00" { for i, loop := range dev.DeviceLoops { loop.State = 0 dev.DeviceLoops[i] = loop } } else if OperationCommand[14:16] == "ff" { for i, loop := range dev.DeviceLoops { loop.State = 1 dev.DeviceLoops[i] = loop } } for i, device := range reg.Devices { if device.Sn == data.Ip { reg.Devices[i] = dev } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("设备回路状态" + err.Error()) return } } switch toString[2:6] { case "0101": // 将16进制字符串解码为字节切片 bytes, err := hex.DecodeString(toString[6:8]) if err != nil { logger.Get().Errorln("解码失败:", err) return } // 转换为二进制字符串 binStr := "" for _, b := range bytes { // 使用fmt.Sprintf将每个字节转换为8位的二进制字符串 binStr += fmt.Sprintf("%08b", b) } for i, device := range reg.Devices { if device.Ip == data.Ip { reg.Devices[i].State = 1 reg.Devices[i].OnlineTime = time.Now() if len(device.DeviceLoops) == 8 { for j := len(device.DeviceLoops) - 1; j >= 0; j-- { reg.Devices[i].DeviceLoops[7-j].State = int(binStr[j] - '0') } } else { for j := len(device.DeviceLoops) - 1; j >= 0; j-- { reg.Devices[i].DeviceLoops[3-j].State = int(binStr[j+4] - '0') } } } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("设备回路状态" + err.Error()) return } } case "11": switch toString[2:6] { case "0336": batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64) batteryCurrent, _ := strconv.ParseInt(toString[10:14], 16, 64) batteryPlateVoltage, _ := strconv.ParseInt(toString[38:42], 16, 64) sun := dao.Sun{ DeviceId: dev.Sn, BatteryVoltage: float64(batteryVoltage) / 100, BatteryCurrent: int(batteryCurrent), BatteryPlateVoltage: float64(batteryPlateVoltage) / 100, } for i, device := range reg.Devices { if device.Sn == dev.Sn { reg.Devices[i].Sun = sun } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Println("太阳能数据保存" + err.Error()) return } //电池 if float64(batteryVoltage)/100 < 5 { data1 := modbus.DeviceSwitch(8, 1) if conn1, ok := model.ConnectionMap1.Load(data.Ip); ok { // 成功找到连接 netConn := conn1.(net.Conn) err := utils.WriteDevice(data1, netConn) if err != nil { logger.Get().Errorf("电池符合控制 写命令错误: %s -- conn: %v", err, netConn.RemoteAddr().String()) return } } else { // 没有找到对应的连接 logger.Get().Printf("电池符合控制Connection for key %s not found", data.Ip) } } else { data1 := modbus.DeviceSwitch(8, 0) if conn1, ok := model.ConnectionMap1.Load(data.Ip); ok { // 成功找到连接 netConn := conn1.(net.Conn) err := utils.WriteDevice(data1, netConn) if err != nil { logger.Get().Errorf("电池符合控制 写命令错误: %s -- conn: %v", err, netConn.RemoteAddr().String()) return } } else { // 没有找到对应的连接 logger.Get().Printf("电池符合控制Connection for key %s not found", data.Ip) } } } } switch toString[0:4] { case "4c43": bytes, err := hex.DecodeString(toString[4:]) if err != nil { logger.Get().Errorln("Error decoding bytes:", err) return } for i, device := range reg.Devices { if device.Sn == string(bytes) { reg.Devices[i].State = 1 reg.Devices[i].OnlineTime = time.Now() } } regions, err := utils.SaveRegionOnData(reg) err = service.SaveData(regions) if err != nil { logger.Get().Errorln("心跳" + err.Error()) return } } }