myData.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package initialize
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "log"
  6. "net"
  7. "runtime"
  8. "runtime/debug"
  9. "server/modbus"
  10. "server/model"
  11. "server/service"
  12. "server/utils"
  13. "strconv"
  14. "sync"
  15. "time"
  16. )
  17. type ModbusHandler struct {
  18. queue *modbus.MlQueue
  19. }
  20. var _handlerOnce sync.Once
  21. var _handlerSingle *ModbusHandler
  22. func GetHandler() *ModbusHandler {
  23. _handlerOnce.Do(func() {
  24. _handlerSingle = &ModbusHandler{
  25. queue: modbus.NewQueue(10000),
  26. }
  27. })
  28. return _handlerSingle
  29. }
  30. func InitInductanceTCP() {
  31. lis, err := net.Listen("tcp", ":60001")
  32. if err != nil {
  33. fmt.Println(err)
  34. } else {
  35. fmt.Println(lis)
  36. model.InductanceTCP = lis
  37. fmt.Println("inductanceTCP启动成功")
  38. }
  39. go StartInductanceTCP()
  40. }
  41. func StartInductanceTCP() {
  42. fmt.Println("进入StartInductanceTCP")
  43. handler := GetHandler()
  44. for {
  45. log.Println("进入InductanceTCP循环")
  46. conn, err := model.InductanceTCP.Accept()
  47. if err != nil {
  48. log.Println(err)
  49. }
  50. remoteAddr := conn.RemoteAddr().String()
  51. log.Printf("lis Accept conn = %s\n", remoteAddr)
  52. model.Mutex.Lock()
  53. if _, exists := model.ConnectionMap[remoteAddr]; exists {
  54. log.Printf("Connection from %s already exists, closing new connection\n", remoteAddr)
  55. conn.Close()
  56. model.Mutex.Unlock()
  57. continue
  58. }
  59. deviceId, err := GetDeviceId(conn)
  60. if err != nil {
  61. log.Println("Error getting device ID:", err)
  62. conn.Close()
  63. model.Mutex.Unlock()
  64. continue
  65. }
  66. model.ConnectionMap[deviceId] = conn
  67. model.Mutex.Unlock()
  68. //用新的协程处理新的连接
  69. go handler.ReadAndHandle(conn, deviceId)
  70. go handler.Handler()
  71. }
  72. }
  73. func GetDeviceId(conn net.Conn) (id string, err error) {
  74. // 发送 Modbus RTU 帧
  75. //FE 04 03 EE 00 08 85 B2
  76. data, err := utils.WriteAndReadDevice(modbus.ReadDeviceId, conn, 9, 2)
  77. if err != nil {
  78. return
  79. }
  80. return string(data), err
  81. }
  82. func (o *ModbusHandler) ReadAndHandle(conn net.Conn, deviceId string) {
  83. for {
  84. buffer := make([]byte, 1024)
  85. n, err := conn.Read(buffer)
  86. if err != nil {
  87. log.Println("Error reading from connection:", err)
  88. return
  89. }
  90. queueData := model.QueueData{
  91. Id: deviceId,
  92. Value: buffer[:n],
  93. }
  94. ok, cnt := o.queue.Put(&queueData)
  95. if ok {
  96. continue
  97. } else {
  98. fmt.Printf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  99. runtime.Gosched()
  100. }
  101. }
  102. }
  103. func (o *ModbusHandler) Handler() interface{} {
  104. defer func() {
  105. if err := recover(); err != nil {
  106. go GetHandler().Handler()
  107. fmt.Printf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  108. }
  109. }()
  110. for {
  111. msg, ok, quantity := o.queue.Get()
  112. if !ok {
  113. time.Sleep(10 * time.Millisecond)
  114. continue
  115. } else if quantity > 1000 {
  116. fmt.Printf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  117. }
  118. queueData, ok := msg.(*model.QueueData)
  119. if !ok {
  120. fmt.Println("Type assertion failed: msg is not of type model.QueueDat")
  121. return nil
  122. }
  123. // 信息处理返回
  124. parseData(queueData)
  125. // 对数据进行修改
  126. }
  127. }
  128. func parseData(data *model.QueueData) {
  129. reg, dev, err := utils.GetDataByDeviceId(data.Id)
  130. if err != nil {
  131. fmt.Println("Error getting register and device:", err)
  132. return
  133. }
  134. toString := hex.EncodeToString(data.Value)
  135. switch toString[0:2] {
  136. case "fe":
  137. switch toString[4:8] { // 开关灯
  138. case "0000", "0001", "0002", "0003", "0004", "0005", "0006", "0007":
  139. if toString[8:12] == "0000" {
  140. for i, loop := range dev.DeviceLoops {
  141. loop.State = 0
  142. dev.DeviceLoops[i] = loop
  143. }
  144. } else if toString[8:12] == "ff00" {
  145. for i, loop := range dev.DeviceLoops {
  146. loop.State = 1
  147. dev.DeviceLoops[i] = loop
  148. }
  149. }
  150. }
  151. case "11":
  152. switch toString[2:6] {
  153. case "0336":
  154. batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64)
  155. //电池
  156. if float64(batteryVoltage)/100 < 5 {
  157. data1 := modbus.DeviceSwitch(8, 1)
  158. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  159. } else {
  160. data1 := modbus.DeviceSwitch(8, 0)
  161. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  162. }
  163. }
  164. }
  165. switch toString[0:4] {
  166. case "4c43":
  167. bytes, err := hex.DecodeString(toString[4:])
  168. if err != nil {
  169. fmt.Println("Error decoding bytes:", err)
  170. return
  171. }
  172. fmt.Println(string(bytes) + time.Now().String())
  173. for i, device := range reg.Devices {
  174. if device.Sn == string(bytes) {
  175. reg.Devices[i].State = 1
  176. reg.Devices[i].OnlineTime = time.Now()
  177. }
  178. }
  179. regions, err := utils.SaveRegionOnData(reg)
  180. err = service.SaveData(regions)
  181. if err != nil {
  182. fmt.Println(err)
  183. return
  184. }
  185. }
  186. }