123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
- package utils
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net"
- "os"
- "path/filepath"
- "server/dao"
- "server/logger"
- "server/model"
- "strings"
- "sync"
- "time"
- )
- const dataFilePath = "static/data.json"
- var InitRegionData []dao.Region
- // LoadData1 从文件中加载 Region 数据。
- func LoadData1() {
- var regions []dao.Region
- // 尝试从主数据文件读取数据
- data, err := os.ReadFile(dataFilePath)
- if err != nil {
- if !os.IsNotExist(err) {
- logger.Get().Errorf("读取文件失败: %v", err)
- return
- }
- // 如果主数据文件不存在,则尝试从备份文件读取
- } else {
- // 检查文件是否为空
- if len(data) > 0 {
- if err := json.Unmarshal(data, ®ions); err != nil {
- logger.Get().Errorf("解析 JSON 失败 (主文件): %v, 原始数据: %s", err, string(data))
- return
- }
- }
- }
- InitRegionData = regions
- }
- // LoadData 从文件中加载 Region 数据。
- func LoadData() ([]dao.Region, error) {
- return InitRegionData, nil
- }
- func GetOnlineDevices() (devices []dao.Device, err error) {
- regions, err := LoadData()
- if err != nil {
- return nil, err
- }
- for _, region := range regions {
- for _, device := range region.Devices {
- if device.State == 0 {
- continue
- }
- devices = append(devices, device)
- }
- }
- return devices, err
- }
- func GetAllDevices() (devices []dao.Device, err error) {
- regions, err := LoadData()
- if err != nil {
- return nil, err
- }
- for _, region := range regions {
- for _, device := range region.Devices {
- devices = append(devices, device)
- }
- }
- return devices, err
- }
- // GetDataByDeviceIP 按设备ip获取信息 地区+设备
- func GetDataByDeviceIP(remoteAddr string) (reg dao.Region, dev dao.Device, err error) {
- regions, err := LoadData()
- if err != nil {
- return reg, dev, err
- }
- for _, region := range regions {
- for _, device := range region.Devices {
- if device.Ip == remoteAddr {
- return region, device, nil
- }
- }
- }
- return reg, dev, fmt.Errorf("设备%s未找到", remoteAddr)
- }
- func GetDataByDeviceId(id string) (reg dao.Region, dev dao.Device, err error) {
- regions, err := LoadData()
- if err != nil {
- return reg, dev, err
- }
- for _, region := range regions {
- for _, device := range region.Devices {
- if device.Sn == id {
- return region, device, nil
- }
- }
- }
- return reg, dev, fmt.Errorf("设备id%s未找到", id)
- }
- func SaveRegionOnData(data dao.Region) ([]dao.Region, error) {
- regions, err := LoadData()
- if err != nil {
- return regions, err
- }
- for i, region := range regions {
- if region.Name == data.Name {
- regions[i] = data
- break
- }
- }
- return regions, nil
- }
- var s sync.Mutex
- func SaveRegions(regions []dao.Region) {
- s.Lock()
- // 创建一个映射以便快速查找
- regionMap := make(map[string]dao.Region)
- orderSlice := make([]string, 0, len(InitRegionData)) // 用于保存顺序的切片
- for _, region := range InitRegionData {
- regionMap[region.Name] = region
- orderSlice = append(orderSlice, region.Name)
- }
- // 遍历传入的新数据,进行版本控制检查
- for _, newRegion := range regions {
- existingRegion, exists := regionMap[newRegion.Name]
- if exists {
- if existingRegion.Version != newRegion.Version {
- logger.Get().Errorf("版本冲突: 区域 '%s' 的当前版本=%d, 提交版本=%d", newRegion.Name, existingRegion.Version, newRegion.Version)
- }
- // 更新版本号
- newRegion.Version++
- regionMap[newRegion.Name] = newRegion
- } else {
- // 如果区域不存在,则直接添加(假设这是新增的区域)
- newRegion.Version = 1 // 新增区域的初始版本号设为1
- regionMap[newRegion.Name] = newRegion
- orderSlice = append(orderSlice, newRegion.Name) // 添加新区域到顺序切片
- }
- }
- // 将 map 按照 orderSlice 的顺序转换回 slice
- updatedRegions := make([]dao.Region, 0, len(regionMap))
- for _, name := range orderSlice {
- updatedRegions = append(updatedRegions, regionMap[name])
- }
- InitRegionData = updatedRegions
- s.Unlock()
- }
- // SaveData 保存数据到文件
- func SaveData(path string, newData []dao.Region) error {
- // 序列化数据
- data, err := json.MarshalIndent(newData, "", " ")
- if err != nil {
- return fmt.Errorf("JSON 序列化失败: %v", err)
- }
- // 创建临时文件用于原子写入
- tempFile, err := ioutil.TempFile(filepath.Dir(path), "tmp-")
- if err != nil {
- return fmt.Errorf("创建临时文件失败: %v", err)
- }
- defer os.Remove(tempFile.Name()) // 确保即使出错也能清理临时文件
- // 写入临时文件
- if _, err := tempFile.Write(data); err != nil {
- tempFile.Close()
- return fmt.Errorf("写入临时文件失败: %v", err)
- }
- // 确保所有数据都写入磁盘
- if err := tempFile.Sync(); err != nil {
- tempFile.Close()
- return fmt.Errorf("同步临时文件失败: %v", err)
- }
- // 关闭临时文件
- if err := tempFile.Close(); err != nil {
- return fmt.Errorf("关闭临时文件失败: %v", err)
- }
- // 检查目标文件是否存在以及是否有写权限
- if _, err := os.Stat(path); err == nil {
- // 文件存在,检查是否可写
- if writable, err := checkWritable(path); err != nil || !writable {
- return fmt.Errorf("目标文件不可写: %v", err)
- }
- }
- if err = os.Remove(path); err != nil {
- if !os.IsNotExist(err) {
- return fmt.Errorf("删除目标文件失败: %w", err)
- }
- }
- // 尝试重命名临时文件到目标位置
- if err := os.Rename(tempFile.Name(), path); err != nil {
- return fmt.Errorf("替换文件失败: %v", err)
- }
- return nil
- }
- // 辅助函数:检查文件是否可写
- func checkWritable(filename string) (bool, error) {
- file, err := os.OpenFile(filename, os.O_WRONLY, 0)
- if err != nil {
- return false, err
- }
- file.Close()
- return true, nil
- }
- const (
- maxRetries = 3 // 最大重试次数
- readTimeout = 5 * time.Second // 读取超时时间
- writeTimeout = 5 * time.Second // 写入超时时间
- reconnectWait = 2 * time.Second // 重连等待时间
- )
- // checkConnection 检查连接是否仍然有效
- func checkConnection(conn net.Conn) error {
- probe := []byte{}
- _, err := conn.Write(probe)
- return err
- }
- // checkAndReconnect 检查是否需要重连,并在必要时执行重连
- func checkAndReconnect(conn net.Conn) (net.Conn, error) {
- remoteAddr := conn.RemoteAddr().String()
- // 解析远程地址
- addr1, err := net.ResolveTCPAddr("tcp", remoteAddr)
- if err != nil {
- // 处理错误...
- logger.Get().Errorf("解析错误 conn = %s\n", addr1.IP.String())
- }
- // 关闭旧连接
- if err := conn.Close(); err != nil {
- logger.Get().Errorf("Failed to close connection: %v", err)
- return nil, err
- }
- // 解析原始连接的远程地址和网络接口
- addr, networkInterface, err := parseRemoteAddr(addr1.IP.String())
- if err != nil {
- logger.Get().Errorf("Failed to parse remote address: %v", err)
- return nil, err
- }
- // 尝试重新建立连接
- newConn, err := net.Dial("tcp", fmt.Sprintf("%s%%%s", addr, networkInterface))
- if err != nil {
- logger.Get().Errorf("Reconnect failed: %v", err)
- return nil, err
- }
- logger.Get().Warnf("Successfully reconnected to %s", remoteAddr)
- return newConn, nil
- }
- // isConnectionClosedError 检查错误是否表明连接被对端强制关闭或已关闭
- func isConnectionClosedError(err error) bool {
- if ne, ok := err.(*net.OpError); ok {
- return strings.Contains(ne.Err.Error(), "forcibly closed") ||
- strings.Contains(ne.Err.Error(), "broken pipe") ||
- strings.Contains(ne.Err.Error(), "connection reset") ||
- strings.Contains(ne.Err.Error(), "use of closed network connection")
- }
- return false
- }
- // setDeadlineWithRetry 使用重试机制设置读写截止时间
- func setDeadlineWithRetry(conn net.Conn, timeout time.Duration, operation string) error {
- for attempts := 0; attempts < maxRetries; attempts++ {
- var err error
- switch operation {
- case "read":
- err = conn.SetReadDeadline(time.Now().Add(timeout))
- case "write":
- err = conn.SetWriteDeadline(time.Now().Add(timeout))
- default:
- return fmt.Errorf("invalid operation: %s", operation)
- }
- if err == nil {
- return nil
- }
- if isConnectionClosedError(err) {
- logger.Get().Warnf("Set %s deadline failed due to closed connection, retrying (%d/%d)", operation, attempts+1, maxRetries)
- var newConn net.Conn
- var reconnErr error
- newConn, reconnErr = checkAndReconnect(conn)
- if reconnErr != nil {
- time.Sleep(reconnectWait) // 等待一段时间后重试
- continue // 继续下一次重试
- }
- conn = newConn
- continue // 重试设置截止时间
- }
- logger.Get().Errorf("Set %s deadline failed: %v", operation, err)
- return fmt.Errorf("set %s deadline failed after %d retries: %v", operation, attempts+1, err)
- }
- return fmt.Errorf("failed to set %s deadline after %d retries", operation, maxRetries)
- }
- // readWithRetry 使用重试机制进行读取操作
- func readWithRetry(buffer []byte, conn net.Conn) (int, error) {
- for attempts := 0; attempts < maxRetries; attempts++ {
- if err := setDeadlineWithRetry(conn, readTimeout, "read"); err != nil {
- return 0, err
- }
- // 检查连接是否仍然有效
- if err := checkConnection(conn); err != nil {
- if isConnectionClosedError(err) {
- logger.Get().Warnf("Connection check failed due to closed connection, retrying (%d/%d)", attempts+1, maxRetries)
- var newConn net.Conn
- var reconnErr error
- newConn, reconnErr = checkAndReconnect(conn)
- if reconnErr != nil {
- time.Sleep(reconnectWait) // 等待一段时间后重试
- continue // 继续下一次重试
- }
- conn = newConn
- continue // 重试读取
- }
- logger.Get().Errorf("Connection check failed: %v", err)
- return 0, fmt.Errorf("connection check failed after %d retries: %v", attempts+1, err)
- }
- n, err := conn.Read(buffer)
- if err == nil {
- return n, nil
- }
- if isConnectionClosedError(err) {
- logger.Get().Warnf("Read failed due to closed connection, retrying (%d/%d)", attempts+1, maxRetries)
- var newConn net.Conn
- var reconnErr error
- newConn, reconnErr = checkAndReconnect(conn)
- if reconnErr != nil {
- time.Sleep(reconnectWait) // 等待一段时间后重试
- continue // 继续下一次重试
- }
- conn = newConn
- continue // 重试读取
- }
- logger.Get().Errorf("Read failed: %v", err)
- return 0, fmt.Errorf("read failed after %d retries: %v", attempts+1, err)
- }
- return 0, fmt.Errorf("failed to read after %d retries", maxRetries)
- }
- // writeWithRetry 使用重试机制进行写入操作
- func writeWithRetry(frame []byte, conn net.Conn) error {
- for attempts := 0; attempts < maxRetries; attempts++ {
- if err := setDeadlineWithRetry(conn, writeTimeout, "write"); err != nil {
- return err
- }
- // 检查连接是否仍然有效
- if err := checkConnection(conn); err != nil {
- if isConnectionClosedError(err) {
- logger.Get().Warnf("Connection check failed due to closed connection, retrying (%d/%d)", attempts+1, maxRetries)
- var newConn net.Conn
- var reconnErr error
- newConn, reconnErr = checkAndReconnect(conn)
- if reconnErr != nil {
- time.Sleep(reconnectWait) // 等待一段时间后重试
- continue // 继续下一次重试
- }
- conn = newConn
- continue // 重试写入
- }
- logger.Get().Errorf("Connection check failed: %v", err)
- return fmt.Errorf("connection check failed after %d retries: %v", attempts+1, err)
- }
- _, err := conn.Write(frame)
- if err == nil {
- return nil
- }
- if isConnectionClosedError(err) {
- logger.Get().Warnf("Write failed due to closed connection, retrying (%d/%d)", attempts+1, maxRetries)
- var newConn net.Conn
- var reconnErr error
- newConn, reconnErr = checkAndReconnect(conn)
- if reconnErr != nil {
- time.Sleep(reconnectWait) // 等待一段时间后重试
- continue // 继续下一次重试
- }
- conn = newConn
- continue // 重试写入
- }
- logger.Get().Errorf("Write failed: %v", err)
- return fmt.Errorf("write failed after %d retries: %v", attempts+1, err)
- }
- return fmt.Errorf("failed to write after %d retries", maxRetries)
- }
- // ReadDevice 从设备读取数据
- func ReadDevice(buffer []byte, conn net.Conn) (int, error) {
- return readWithRetry(buffer, conn)
- }
- // WriteDevice1 向设备写入数据
- func WriteDevice1(frame []byte, conn net.Conn) error {
- return writeWithRetry(frame, conn)
- }
- func WriteDevice(frame []byte, conn net.Conn) error {
- _, err := conn.Write(frame)
- if err != nil {
- defer conn.Close()
- model.ConnectionMap1.Delete(conn.RemoteAddr().String())
- return err
- }
- return nil
- }
- // 解析远程地址,提取 IP 地址和网络接口名称
- func parseRemoteAddr(addr string) (string, string, error) {
- parts := strings.Split(addr, "%")
- if len(parts) != 2 {
- return "", "", fmt.Errorf("invalid remote address format: %s", addr)
- }
- return parts[0], parts[1], nil
- }
- func WriteAndReadDevice(frame []byte, conn net.Conn, former, after int) (data []byte, err error) {
- // 发送 Modbus RTU 帧
- n, err := conn.Write(frame)
- if err != nil {
- logger.Get().Errorln("Error writing to connection:", err)
- return
- }
- // 等待一段时间以接收响应
- time.Sleep(1000 * time.Millisecond)
- // 读取响应
- buffer := make([]byte, 1024)
- n, err = conn.Read(buffer)
- if err != nil {
- logger.Get().Errorln("Error reading from connection:", err)
- return
- }
- // 检查读取的字节数是否足够
- if n < former+after {
- err = fmt.Errorf("not enough bytes read to satisfy the slice range")
- return
- }
- // 返回子切片
- return buffer[former : n-after], err
- }
|