main.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. //go:generate reform --gofmt=false models
  2. package main
  3. import (
  4. "database/sql"
  5. "devel.mephi.ru/dyokunev/cardlogger-parser/models"
  6. "fmt"
  7. _ "github.com/go-sql-driver/mysql"
  8. "github.com/xaionaro/reform"
  9. "github.com/xaionaro/reform/dialects/mysql"
  10. "net"
  11. "os"
  12. "runtime"
  13. "strings"
  14. "time"
  15. )
  16. const (
  17. SQL_LOGGER_TRACEBACK_DEPTH int = 10
  18. )
  19. func checkError(err error) {
  20. if err != nil {
  21. panic(err)
  22. }
  23. }
  24. type stage int
  25. const (
  26. STAGE_IDLE stage = 0
  27. STAGE_KEEPALIVE stage = 1
  28. STAGE_STARTING stage = 2
  29. STAGE_GETTINGVALUE stage = 3
  30. )
  31. type status struct {
  32. Stage stage
  33. Pos byte
  34. T byte
  35. Value [3]int
  36. }
  37. func gotKeepAlive(port int) {
  38. //fmt.Println("gotKeepAlive(): port == ", port)
  39. keepaliveRecord, err := models.KeepaliveRecordSQL.First(port)
  40. if err != nil && err != sql.ErrNoRows {
  41. fmt.Println("SQL error: ", err)
  42. }
  43. keepaliveRecord.Counter++
  44. keepaliveRecord.UpdatedAt = time.Now()
  45. if err == sql.ErrNoRows {
  46. keepaliveRecord.Port = port
  47. err = keepaliveRecord.Create()
  48. } else {
  49. err = keepaliveRecord.Update()
  50. }
  51. if err != nil {
  52. fmt.Println("SQL error: ", err)
  53. }
  54. }
  55. func gotValue(port int, valueA [3]int) {
  56. //fmt.Println("gotValue(): port == ", port, "; valueA == ", valueA)
  57. if valueA[0] != valueA[1] || valueA[1] != valueA[2] {
  58. fmt.Println("values doesn't match: ", valueA, "; port == ", port)
  59. if valueA[0] != valueA[1] && valueA[1] == valueA[2] {
  60. valueA[0] = valueA[1]
  61. fmt.Println("corrected value: ", valueA[0])
  62. }
  63. if valueA[0] != valueA[1] && valueA[1] != valueA[2] && valueA[0] != valueA[2] {
  64. return
  65. }
  66. }
  67. err := models.LogRecord{
  68. Port: port,
  69. Value: int64(valueA[0]),
  70. CreatedAt: time.Now(),
  71. }.Create()
  72. if err != nil {
  73. fmt.Println("SQL error: ", err)
  74. }
  75. }
  76. type smartLogger struct {
  77. dbName string
  78. traceEnable bool
  79. errorEnable bool
  80. }
  81. func (logger smartLogger) queryWrapper(query string) string {
  82. var where string
  83. for i := 2; i < 32; i++ {
  84. _, filePath, _, ok := runtime.Caller(i)
  85. if !ok {
  86. break
  87. }
  88. if strings.HasSuffix(filePath, "_reform.go") {
  89. continue
  90. }
  91. if filePath == `<autogenerated>` {
  92. continue
  93. }
  94. whereArray := make([]string, SQL_LOGGER_TRACEBACK_DEPTH, SQL_LOGGER_TRACEBACK_DEPTH)
  95. for j := 0; j < SQL_LOGGER_TRACEBACK_DEPTH; j++ {
  96. _, filePath, line, ok := runtime.Caller(i + j)
  97. pathParts := strings.Split(filePath, "/")
  98. fileName := pathParts[len(pathParts)-1]
  99. if !ok || strings.HasSuffix(fileName, ".s") {
  100. whereArray = whereArray[SQL_LOGGER_TRACEBACK_DEPTH-j:]
  101. break
  102. }
  103. whereArray[SQL_LOGGER_TRACEBACK_DEPTH-1-j] = fmt.Sprintf("%v:%v", fileName, line)
  104. }
  105. where = "[" + strings.Join(whereArray, " -> ") + "] "
  106. break
  107. }
  108. return fmt.Sprintf("%v[db:%s] %s", where, logger.dbName, query)
  109. }
  110. func (logger *smartLogger) SetTraceEnable(enable bool) {
  111. logger.traceEnable = enable
  112. }
  113. func (logger *smartLogger) SetErrorEnable(enable bool) {
  114. logger.errorEnable = enable
  115. }
  116. func (logger smartLogger) Before(query string, args []interface{}) {
  117. if logger.traceEnable {
  118. fmt.Println(logger.queryWrapper(query), args)
  119. }
  120. return
  121. }
  122. func (logger smartLogger) After(query string, args []interface{}, d time.Duration, err error) {
  123. if err != nil {
  124. if logger.errorEnable {
  125. fmt.Println(logger.queryWrapper(query), args, d, err)
  126. }
  127. } else {
  128. if logger.traceEnable {
  129. fmt.Println(logger.queryWrapper(query), args, d, err)
  130. }
  131. }
  132. return
  133. }
  134. func initDB() {
  135. simpleDB, err := sql.Open("mysql", "parser:"+os.Getenv("MYSQL_PASSWORD")+"@unix(/var/run/mysqld/mysqld.sock)/db?charset=utf8&parseTime=true")
  136. if err != nil {
  137. panic(fmt.Errorf("Cannot connect to DB: %s", err.Error()))
  138. }
  139. logger := smartLogger{}
  140. logger.SetTraceEnable(true)
  141. logger.SetErrorEnable(true)
  142. DB := reform.NewDB(simpleDB, mysql.Dialect, logger)
  143. // Setting up table "keepalive"
  144. {
  145. _, err := models.KeepaliveRecordTable.CreateTableIfNotExists(DB)
  146. checkError(err)
  147. models.KeepaliveRecordSQL.SetDefaultDB(DB)
  148. }
  149. // Setting up table "log"
  150. {
  151. _, err := models.LogRecordTable.CreateTableIfNotExists(DB)
  152. checkError(err)
  153. models.LogRecordSQL.SetDefaultDB(DB)
  154. }
  155. return
  156. }
  157. func main() {
  158. initDB()
  159. addr, err := net.ResolveUDPAddr("udp", ":36400")
  160. checkError(err)
  161. conn, err := net.ListenUDP("udp", addr)
  162. checkError(err)
  163. defer conn.Close()
  164. buf := make([]byte, 1024)
  165. curStatusMap := make(map[int]status)
  166. considerChar := func(port int, c byte) {
  167. curStatus := curStatusMap[port]
  168. defer func() { /*fmt.Println("curStatus ->", curStatus);*/ curStatusMap[port] = curStatus }()
  169. switch curStatus.Stage {
  170. case STAGE_IDLE:
  171. switch c {
  172. case 0xfe:
  173. curStatus.Stage = STAGE_KEEPALIVE
  174. case 0xff:
  175. curStatus.Stage = STAGE_STARTING
  176. default:
  177. goto errorReset
  178. }
  179. curStatus.Pos = 1
  180. case STAGE_KEEPALIVE:
  181. if c != 0xfe {
  182. goto errorReset
  183. }
  184. curStatus.Pos++
  185. if curStatus.Pos == 4 {
  186. gotKeepAlive(port)
  187. goto reset
  188. }
  189. return
  190. case STAGE_STARTING:
  191. if c != 0xff-curStatus.Pos {
  192. goto errorReset
  193. }
  194. curStatus.Pos++
  195. if curStatus.Pos == 4 {
  196. curStatus.Pos = 0
  197. curStatus.Stage = STAGE_GETTINGVALUE
  198. return
  199. }
  200. case STAGE_GETTINGVALUE:
  201. curStatus.Value[curStatus.T] |= int(c) << (curStatus.Pos * 8)
  202. curStatus.Pos++
  203. if curStatus.Pos != 4 {
  204. return
  205. }
  206. curStatus.Pos = 0
  207. curStatus.T++
  208. if curStatus.T != 3 {
  209. return
  210. }
  211. gotValue(port, curStatus.Value)
  212. goto reset
  213. default:
  214. panic(fmt.Errorf("Unknown stage: %v", curStatus.Stage))
  215. }
  216. return
  217. errorReset:
  218. fmt.Println("got invalid char (", c, ") from port ", port, ", reset (curStatus == ", curStatus, ").")
  219. reset:
  220. // fmt.Println("reset")
  221. curStatus = status{}
  222. }
  223. for {
  224. n, addr, err := conn.ReadFromUDP(buf)
  225. for i := 0; i < n; i++ {
  226. considerChar(addr.Port, buf[i])
  227. }
  228. if err != nil {
  229. fmt.Println("Error: ", err)
  230. }
  231. }
  232. return
  233. }