myData.go 5.0 KB


  1. package initialize
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "log"
  6. "net"
  7. "runtime"
  8. "runtime/debug"
  9. "server/dao"
  10. "server/modbus"
  11. "server/model"
  12. "server/service"
  13. "server/utils"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. type ModbusHandler struct {
  19. queue *modbus.MlQueue
  20. }
  21. var _handlerOnce sync.Once
  22. var _handlerSingle *ModbusHandler
  23. func GetHandler() *ModbusHandler {
  24. _handlerOnce.Do(func() {
  25. _handlerSingle = &ModbusHandler{
  26. queue: modbus.NewQueue(10000),
  27. }
  28. })
  29. return _handlerSingle
  30. }
  31. func InitInductanceTCP() {
  32. lis, err := net.Listen("tcp", ":60001")
  33. if err != nil {
  34. fmt.Println(err)
  35. } else {
  36. fmt.Println(lis)
  37. model.InductanceTCP = lis
  38. fmt.Println("inductanceTCP启动成功")
  39. }
  40. go StartInductanceTCP()
  41. }
  42. func StartInductanceTCP() {
  43. fmt.Println("进入StartInductanceTCP")
  44. handler := GetHandler()
  45. for {
  46. log.Println("进入InductanceTCP循环")
  47. conn, err := model.InductanceTCP.Accept()
  48. if err != nil {
  49. log.Println(err)
  50. }
  51. remoteAddr := conn.RemoteAddr().String()
  52. log.Printf("lis Accept conn = %s\n", remoteAddr)
  53. model.Mutex.Lock()
  54. if _, exists := model.ConnectionMap[remoteAddr]; exists {
  55. log.Printf("Connection from %s already exists, closing new connection\n", remoteAddr)
  56. conn.Close()
  57. model.Mutex.Unlock()
  58. continue
  59. }
  60. deviceId, err := GetDeviceId(conn)
  61. if err != nil {
  62. log.Println("Error getting device ID:", err)
  63. conn.Close()
  64. model.Mutex.Unlock()
  65. continue
  66. }
  67. model.ConnectionMap[deviceId] = conn
  68. model.Mutex.Unlock()
  69. //用新的协程处理新的连接
  70. go handler.ReadAndHandle(conn, deviceId)
  71. go handler.Handler()
  72. }
  73. }
  74. func GetDeviceId(conn net.Conn) (id string, err error) {
  75. // 发送 Modbus RTU 帧
  76. //FE 04 03 EE 00 08 85 B2
  77. data, err := utils.WriteAndReadDevice(modbus.ReadDeviceId, conn, 9, 2)
  78. if err != nil {
  79. return
  80. }
  81. return string(data), err
  82. }
  83. func (o *ModbusHandler) ReadAndHandle(conn net.Conn, deviceId string) {
  84. for {
  85. buffer := make([]byte, 1024)
  86. n, err := conn.Read(buffer)
  87. if err != nil {
  88. log.Println("Error reading from connection:", err)
  89. return
  90. }
  91. queueData := model.QueueData{
  92. Id: deviceId,
  93. Value: buffer[:n],
  94. }
  95. ok, cnt := o.queue.Put(&queueData)
  96. if ok {
  97. continue
  98. } else {
  99. fmt.Printf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  100. runtime.Gosched()
  101. }
  102. }
  103. }
  104. func (o *ModbusHandler) Handler() interface{} {
  105. defer func() {
  106. if err := recover(); err != nil {
  107. go GetHandler().Handler()
  108. fmt.Printf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  109. }
  110. }()
  111. for {
  112. msg, ok, quantity := o.queue.Get()
  113. if !ok {
  114. time.Sleep(10 * time.Millisecond)
  115. continue
  116. } else if quantity > 1000 {
  117. fmt.Printf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  118. }
  119. queueData, ok := msg.(*model.QueueData)
  120. if !ok {
  121. fmt.Println("Type assertion failed: msg is not of type model.QueueDat")
  122. return nil
  123. }
  124. // 信息处理返回
  125. parseData(queueData)
  126. // 对数据进行修改
  127. }
  128. }
  129. func parseData(data *model.QueueData) {
  130. reg, dev, err := utils.GetDataByDeviceId(data.Id)
  131. if err != nil {
  132. fmt.Println("Error getting register and device:", err)
  133. return
  134. }
  135. toString := hex.EncodeToString(data.Value)
  136. switch toString[0:2] {
  137. case "fe":
  138. switch toString[4:8] { // 开关灯
  139. case "0000", "0001", "0002", "0003", "0004", "0005", "0006", "0007":
  140. if toString[8:12] == "0000" {
  141. for i, loop := range dev.DeviceLoops {
  142. loop.State = 0
  143. dev.DeviceLoops[i] = loop
  144. }
  145. } else if toString[8:12] == "ff00" {
  146. for i, loop := range dev.DeviceLoops {
  147. loop.State = 1
  148. dev.DeviceLoops[i] = loop
  149. }
  150. }
  151. }
  152. case "11":
  153. switch toString[2:6] {
  154. case "0336":
  155. batteryVoltage, _ := strconv.ParseInt(toString[6:10], 16, 64)
  156. batteryCurrent, _ := strconv.ParseInt(toString[10:14], 16, 64)
  157. batteryPlateVoltage, _ := strconv.ParseInt(toString[38:42], 16, 64)
  158. sun := dao.Sun{
  159. DeviceId: dev.Sn,
  160. BatteryVoltage: float64(batteryVoltage) / 100,
  161. BatteryCurrent: int(batteryCurrent),
  162. BatteryPlateVoltage: float64(batteryPlateVoltage) / 100,
  163. }
  164. for i, device := range reg.Devices {
  165. if device.Sn == dev.Sn {
  166. reg.Devices[i].Sun = sun
  167. }
  168. }
  169. regions, err := utils.SaveRegionOnData(reg)
  170. err = service.SaveData(regions)
  171. if err != nil {
  172. fmt.Println(err)
  173. return
  174. }
  175. //电池
  176. if float64(batteryVoltage)/100 < 5 {
  177. data1 := modbus.DeviceSwitch(8, 1)
  178. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  179. } else {
  180. data1 := modbus.DeviceSwitch(8, 0)
  181. utils.WriteDevice(data1, model.ConnectionMap[data.Id])
  182. }
  183. }
  184. }
  185. switch toString[0:4] {
  186. case "4c43":
  187. bytes, err := hex.DecodeString(toString[4:])
  188. if err != nil {
  189. fmt.Println("Error decoding bytes:", err)
  190. return
  191. }
  192. fmt.Println(string(bytes) + time.Now().String())
  193. for i, device := range reg.Devices {
  194. if device.Sn == string(bytes) {
  195. reg.Devices[i].State = 1
  196. reg.Devices[i].OnlineTime = time.Now()
  197. }
  198. }
  199. regions, err := utils.SaveRegionOnData(reg)
  200. err = service.SaveData(regions)
  201. if err != nil {
  202. fmt.Println(err)
  203. return
  204. }
  205. }
  206. }