123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- //go:generate reform --gofmt=false models
- package main
- import (
- "database/sql"
- "devel.mephi.ru/dyokunev/cardlogger-parser/models"
- "fmt"
- _ "github.com/go-sql-driver/mysql"
- "github.com/xaionaro/reform"
- "github.com/xaionaro/reform/dialects/mysql"
- "net"
- "os"
- "runtime"
- "strings"
- "time"
- )
- const (
- SQL_LOGGER_TRACEBACK_DEPTH int = 10
- )
- func checkError(err error) {
- if err != nil {
- panic(err)
- }
- }
- type stage int
- const (
- STAGE_IDLE stage = 0
- STAGE_KEEPALIVE stage = 1
- STAGE_STARTING stage = 2
- STAGE_GETTINGVALUE stage = 3
- )
- type status struct {
- Stage stage
- Pos byte
- T byte
- Value [3]int
- }
- func gotKeepAlive(port int) {
- //fmt.Println("gotKeepAlive(): port == ", port)
- keepaliveRecord, err := models.KeepaliveRecordSQL.First(port)
- if err != nil && err != sql.ErrNoRows {
- fmt.Println("SQL error: ", err)
- }
- keepaliveRecord.Counter++
- keepaliveRecord.UpdatedAt = time.Now()
- if err == sql.ErrNoRows {
- keepaliveRecord.Port = port
- err = keepaliveRecord.Create()
- } else {
- err = keepaliveRecord.Update()
- }
- if err != nil {
- fmt.Println("SQL error: ", err)
- }
- }
- func gotValue(port int, valueA [3]int) {
- //fmt.Println("gotValue(): port == ", port, "; valueA == ", valueA)
- if valueA[0] != valueA[1] || valueA[1] != valueA[2] {
- fmt.Println("values doesn't match: ", valueA, "; port == ", port)
- if valueA[0] != valueA[1] && valueA[1] == valueA[2] {
- valueA[0] = valueA[1]
- fmt.Println("corrected value: ", valueA[0])
- }
- if valueA[0] != valueA[1] && valueA[1] != valueA[2] && valueA[0] != valueA[2] {
- return
- }
- }
- err := models.LogRecord{
- Port: port,
- Value: int64(valueA[0]),
- CreatedAt: time.Now(),
- }.Create()
- if err != nil {
- fmt.Println("SQL error: ", err)
- }
- }
- type smartLogger struct {
- dbName string
- traceEnable bool
- errorEnable bool
- }
- func (logger smartLogger) queryWrapper(query string) string {
- var where string
- for i := 2; i < 32; i++ {
- _, filePath, _, ok := runtime.Caller(i)
- if !ok {
- break
- }
- if strings.HasSuffix(filePath, "_reform.go") {
- continue
- }
- if filePath == `<autogenerated>` {
- continue
- }
- whereArray := make([]string, SQL_LOGGER_TRACEBACK_DEPTH, SQL_LOGGER_TRACEBACK_DEPTH)
- for j := 0; j < SQL_LOGGER_TRACEBACK_DEPTH; j++ {
- _, filePath, line, ok := runtime.Caller(i + j)
- pathParts := strings.Split(filePath, "/")
- fileName := pathParts[len(pathParts)-1]
- if !ok || strings.HasSuffix(fileName, ".s") {
- whereArray = whereArray[SQL_LOGGER_TRACEBACK_DEPTH-j:]
- break
- }
- whereArray[SQL_LOGGER_TRACEBACK_DEPTH-1-j] = fmt.Sprintf("%v:%v", fileName, line)
- }
- where = "[" + strings.Join(whereArray, " -> ") + "] "
- break
- }
- return fmt.Sprintf("%v[db:%s] %s", where, logger.dbName, query)
- }
- func (logger *smartLogger) SetTraceEnable(enable bool) {
- logger.traceEnable = enable
- }
- func (logger *smartLogger) SetErrorEnable(enable bool) {
- logger.errorEnable = enable
- }
- func (logger smartLogger) Before(query string, args []interface{}) {
- if logger.traceEnable {
- fmt.Println(logger.queryWrapper(query), args)
- }
- return
- }
- func (logger smartLogger) After(query string, args []interface{}, d time.Duration, err error) {
- if err != nil {
- if logger.errorEnable {
- fmt.Println(logger.queryWrapper(query), args, d, err)
- }
- } else {
- if logger.traceEnable {
- fmt.Println(logger.queryWrapper(query), args, d, err)
- }
- }
- return
- }
- func initDB() {
- simpleDB, err := sql.Open("mysql", "parser:"+os.Getenv("MYSQL_PASSWORD")+"@unix(/var/run/mysqld/mysqld.sock)/db?charset=utf8&parseTime=true")
- if err != nil {
- panic(fmt.Errorf("Cannot connect to DB: %s", err.Error()))
- }
- logger := smartLogger{}
- logger.SetTraceEnable(true)
- logger.SetErrorEnable(true)
- DB := reform.NewDB(simpleDB, mysql.Dialect, logger)
- // Setting up table "keepalive"
- {
- _, err := models.KeepaliveRecordTable.CreateTableIfNotExists(DB)
- checkError(err)
- models.KeepaliveRecordSQL.SetDefaultDB(DB)
- }
- // Setting up table "log"
- {
- _, err := models.LogRecordTable.CreateTableIfNotExists(DB)
- checkError(err)
- models.LogRecordSQL.SetDefaultDB(DB)
- }
- return
- }
- func main() {
- initDB()
- addr, err := net.ResolveUDPAddr("udp", ":36400")
- checkError(err)
- conn, err := net.ListenUDP("udp", addr)
- checkError(err)
- defer conn.Close()
- buf := make([]byte, 1024)
- curStatusMap := make(map[int]status)
- considerChar := func(port int, c byte) {
- curStatus := curStatusMap[port]
- defer func() { /*fmt.Println("curStatus ->", curStatus);*/ curStatusMap[port] = curStatus }()
- switch curStatus.Stage {
- case STAGE_IDLE:
- switch c {
- case 0xfe:
- curStatus.Stage = STAGE_KEEPALIVE
- case 0xff:
- curStatus.Stage = STAGE_STARTING
- default:
- goto errorReset
- }
- curStatus.Pos = 1
- case STAGE_KEEPALIVE:
- if c != 0xfe {
- goto errorReset
- }
- curStatus.Pos++
- if curStatus.Pos == 4 {
- gotKeepAlive(port)
- goto reset
- }
- return
- case STAGE_STARTING:
- if c != 0xff-curStatus.Pos {
- goto errorReset
- }
- curStatus.Pos++
- if curStatus.Pos == 4 {
- curStatus.Pos = 0
- curStatus.Stage = STAGE_GETTINGVALUE
- return
- }
- case STAGE_GETTINGVALUE:
- curStatus.Value[curStatus.T] |= int(c) << (curStatus.Pos * 8)
- curStatus.Pos++
- if curStatus.Pos != 4 {
- return
- }
- curStatus.Pos = 0
- curStatus.T++
- if curStatus.T != 3 {
- return
- }
- gotValue(port, curStatus.Value)
- goto reset
- default:
- panic(fmt.Errorf("Unknown stage: %v", curStatus.Stage))
- }
- return
- errorReset:
- fmt.Println("got invalid char (", c, ") from port ", port, ", reset (curStatus == ", curStatus, ").")
- reset:
- // fmt.Println("reset")
- curStatus = status{}
- }
- for {
- n, addr, err := conn.ReadFromUDP(buf)
- for i := 0; i < n; i++ {
- considerChar(addr.Port, buf[i])
- }
- if err != nil {
- fmt.Println("Error: ", err)
- }
- }
- return
- }
|