myData.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package initialize
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "log"
  6. "net"
  7. "os"
  8. "runtime"
  9. "runtime/debug"
  10. "server/dao"
  11. "server/modbus"
  12. "server/model"
  13. "server/service"
  14. "server/utils"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "syscall"
  19. "time"
  20. )
  21. type ModbusHandler struct {
  22. queue *modbus.MlQueue
  23. }
  24. var _handlerOnce sync.Once
  25. var _handlerSingle *ModbusHandler
  26. func GetHandler() *ModbusHandler {
  27. _handlerOnce.Do(func() {
  28. _handlerSingle = &ModbusHandler{
  29. queue: modbus.NewQueue(10000),
  30. }
  31. })
  32. return _handlerSingle
  33. }
  34. func InitInductanceTCP() {
  35. lis, err := net.Listen("tcp", ":60001")
  36. if err != nil {
  37. fmt.Println(err)
  38. } else {
  39. fmt.Println(lis)
  40. model.InductanceTCP = lis
  41. fmt.Println("inductanceTCP启动成功")
  42. }
  43. go StartInductanceTCP()
  44. }
  45. func StartInductanceTCP() {
  46. fmt.Println("进入StartInductanceTCP")
  47. handler := GetHandler()
  48. for {
  49. log.Println("进入InductanceTCP循环")
  50. conn, err := model.InductanceTCP.Accept()
  51. if err != nil {
  52. log.Println(err)
  53. }
  54. remoteAddr := conn.RemoteAddr().String()
  55. log.Printf("lis Accept conn = %s\n", remoteAddr)
  56. model.Mutex.Lock()
  57. if _, exists := model.ConnectionMap[remoteAddr]; exists {
  58. log.Printf("Connection from %s already exists, closing new connection\n", remoteAddr)
  59. conn.Close()
  60. model.Mutex.Unlock()
  61. continue
  62. }
  63. deviceId, err := GetDeviceId(conn)
  64. if err != nil {
  65. log.Println("Error getting device ID:", err)
  66. conn.Close()
  67. model.Mutex.Unlock()
  68. continue
  69. }
  70. model.ConnectionMap[deviceId] = conn
  71. model.Mutex.Unlock()
  72. //用新的协程处理新的连接
  73. go handler.ReadAndHandle(conn, deviceId)
  74. go handler.Handler()
  75. }
  76. }
  77. func GetDeviceId(conn net.Conn) (id string, err error) {
  78. // 发送 Modbus RTU 帧
  79. //FE 04 03 EE 00 08 85 B2
  80. data, err := utils.WriteAndReadDevice(modbus.ReadDeviceId, conn, 9, 2)
  81. if err != nil {
  82. return
  83. }
  84. return string(data), err
  85. }
  86. func (o *ModbusHandler) ReadAndHandle(conn net.Conn, deviceId string) {
  87. for {
  88. buffer := make([]byte, 1024)
  89. n, err := conn.Read(buffer)
  90. //if err != nil {
  91. // log.Println("Error reading from connection:", err)
  92. // return
  93. //}
  94. if err != nil {
  95. if isConnReset(err) {
  96. fmt.Println("连接被远程主机强制关闭")
  97. } else if os.IsTimeout(err) {
  98. fmt.Println("读取操作超时")
  99. } else {
  100. // 处理其他类型的错误
  101. fmt.Printf("读取错误: %s\n", err)
  102. }
  103. return
  104. }
  105. queueData := model.QueueData{
  106. Id: deviceId,
  107. Value: buffer[:n],
  108. }
  109. ok, cnt := o.queue.Put(&queueData)
  110. if ok {
  111. continue
  112. } else {
  113. fmt.Printf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  114. runtime.Gosched()
  115. }
  116. }
  117. }
  118. func isConnReset(err error) bool {
  119. if opErr, ok := err.(*net.OpError); ok {
  120. if opErr.Err == syscall.ECONNRESET {
  121. return true // Unix-like 系统上的 ECONNRESET
  122. } else if runtime.GOOS == "windows" {
  123. // Windows 上的 WSAECONNRESET 通常是通过错误消息识别的
  124. if se, ok := opErr.Err.(*os.SyscallError); ok {
  125. if errno, ok := se.Err.(syscall.Errno); ok {
  126. if errno == 10054 { // 10054 对应 WSAECONNRESET
  127. return true
  128. }
  129. }
  130. } else if strings.Contains(opErr.Err.Error(), "WSAECONNRESET") {
  131. // 如果错误消息包含 WSAECONNRESET,也认为是连接被重置
  132. return true
  133. }
  134. }
  135. }
  136. return false
  137. }
  138. func (o *ModbusHandler) Handler() interface{} {
  139. defer func() {
  140. if err := recover(); err != nil {
  141. go GetHandler().Handler()
  142. fmt.Printf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  143. }
  144. }()
  145. for {
  146. msg, ok, quantity := o.queue.Get()
  147. if !ok {
  148. time.Sleep(10 * time.Millisecond)
  149. continue
  150. } else if quantity > 1000 {
  151. fmt.Printf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  152. }
  153. queueData, ok := msg.(*model.QueueData)
  154. if !ok {
  155. fmt.Println("Type assertion failed: msg is not of type model.QueueDat")
  156. return nil
  157. }
  158. // 信息处理返回
  159. parseData(queueData)
  160. // 对数据进行修改
  161. }
  162. }
  163. func parseData(data *model.QueueData) {
  164. reg, dev, err := utils.GetDataByDeviceId(data.Id)
  165. if err != nil {
  166. fmt.Println("Error getting register and device:", err)
  167. return
  168. }
  169. toString := hex.EncodeToString(data.Value)
  170. switch toString[0:2] {
  171. case "fe":
  172. switch toString[4:8] { // 开关灯
  173. case "0000", "0001", "0002", "0003", "0004", "0005", "0006", "0007":
  174. if toString[8:12] == "0000" {
  175. for i, loop := range dev.DeviceLoops {
  176. loop.State = 0
  177. dev.DeviceLoops[i] = loop
  178. }
  179. } else if toString[8:12] == "ff00" {
  180. for i, loop := range dev.DeviceLoops {
  181. loop.State = 1
  182. dev.DeviceLoops[i] = loop
  183. }
  184. }
  185. }
  186. case "11":
  187. switch toString[2:6] {
  188. case "0336":
  189. batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64)
  190. batteryCurrent, _ := strconv.ParseInt(toString[10:14], 16, 64)
  191. batteryPlateVoltage, _ := strconv.ParseInt(toString[38:42], 16, 64)
  192. sun := dao.Sun{
  193. DeviceId: dev.Sn,
  194. BatteryVoltage: float64(batteryVoltage) / 100,
  195. BatteryCurrent: int(batteryCurrent),
  196. BatteryPlateVoltage: float64(batteryPlateVoltage) / 100,
  197. }
  198. for i, device := range reg.Devices {
  199. if device.Sn == dev.Sn {
  200. reg.Devices[i].Sun = sun
  201. }
  202. }
  203. regions, err := utils.SaveRegionOnData(reg)
  204. err = service.SaveData(regions)
  205. if err != nil {
  206. fmt.Println(err)
  207. return
  208. }
  209. //电池
  210. if float64(batteryVoltage)/100 < 5 {
  211. data1 := modbus.DeviceSwitch(8, 1)
  212. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  213. } else {
  214. data1 := modbus.DeviceSwitch(8, 0)
  215. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  216. }
  217. }
  218. }
  219. switch toString[0:4] {
  220. case "4c43":
  221. bytes, err := hex.DecodeString(toString[4:])
  222. if err != nil {
  223. fmt.Println("Error decoding bytes:", err)
  224. return
  225. }
  226. fmt.Println(string(bytes) + time.Now().String())
  227. for i, device := range reg.Devices {
  228. if device.Sn == string(bytes) {
  229. reg.Devices[i].State = 1
  230. reg.Devices[i].OnlineTime = time.Now()
  231. }
  232. }
  233. regions, err := utils.SaveRegionOnData(reg)
  234. err = service.SaveData(regions)
  235. if err != nil {
  236. fmt.Println(err)
  237. return
  238. }
  239. }
  240. }