package logdb import ( "database/sql" "encoding/json" "fmt" "io/ioutil" "log" "os" "sync" "time" _ "github.com/go-sql-driver/mysql" // mysql驱动 yaml "gopkg.in/yaml.v2" ) // LogDB 属性结构 type LogDB struct { Charset string `yaml:"charset"` DB string `yaml:"db"` Hosts []string `yaml:"hosts"` Password string `yaml:"password"` Port string `yaml:"port"` User string `yaml:"user"` pid int hostid int nextCheck int64 checkLimit int64 driver *sql.DB mutex sync.Mutex } // New 创建一个logdb的配置 func New(filename string) *LogDB { logdb := LogDB{} logdb.checkLimit = 300 logdb.pid = os.Getpid() data, err := ioutil.ReadFile(filename) if err != nil { panic(err) } err = yaml.Unmarshal(data, &logdb) if err != nil { panic(err) } logdb.hostid = 0 db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset)) if err != nil { log.Println("connect", err) } else { logdb.driver = db } return &logdb } // Ping 是否Ping通数据库 func (logdb *LogDB) Ping() (result bool) { log.Println("Ping") logdb.mutex.Lock() defer logdb.mutex.Unlock() defer func() { if err := recover(); err != nil { result = false log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ") hostlen := len(logdb.Hosts) errorhid := logdb.hostid for i := 0; i < hostlen; i++ { curid := errorhid + 1 + i if curid >= hostlen { curid = curid - hostlen } curHost := logdb.Hosts[curid] myurl := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=30s&charset=%s", logdb.User, logdb.Password, curHost, logdb.Port, logdb.DB, logdb.Charset) db, err := sql.Open("mysql", myurl) if err != nil { log.Println(err, curHost, " is connect fail") continue } if err := db.Ping(); err != nil { log.Println(err, curHost, " is connect fail") continue } logdb.driver = db logdb.hostid = curid result = true } } }() if err := logdb.driver.Ping(); err != nil { panic(err) } return true } // ADInsert 插入数据 func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) { logdb.mutex.Lock() defer logdb.mutex.Unlock() _, err := logdb.driver.Exec("insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,?, ?, ?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl) if err != nil { log.Println(err) log.Printf("for save ad sql: insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(%s, %s, %s, %s, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s)\n", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl.Format("2006-01-02 15:04:05")) } } type ADResonse struct { UID string Response string } func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { logdb.mutex.Lock() defer logdb.mutex.Unlock() pid := logdb.pid + 2000 logdb.adCheckRecover(spider_id) _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) if err != nil { log.Println(err) return nil } rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid) if err != nil { log.Println(err) return nil } var adresponse []ADResonse var uid, response string for rows.Next() { rows.Scan(&uid, &response) adresponse = append(adresponse, ADResonse{UID: uid, Response: response}) // log.Println(uid, response) } return adresponse } func (logdb *LogDB) adCheckRecover(spider_id int) { now := time.Now() if now.Unix() > logdb.nextCheck { logdb.nextCheck = now.Unix() + logdb.checkLimit tsUpdate := now.Add(-time.Minute * 5) _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) if err != nil { log.Println(err) } } } func (logdb *LogDB) ADParserSuccess(uid string, successData string) { logdb.mutex.Lock() defer logdb.mutex.Unlock() ext := make(map[string]string) ext["success_data"] = successData data, err := json.Marshal(&ext) if err != nil || successData == "" { _, err := logdb.driver.Exec("update log_spider set status = 200 where uid = ?", uid) if err != nil { log.Println(err) } } else { _, err := logdb.driver.Exec("update log_spider set status = 200, ext = ? where uid = ?", string(data), uid) if err != nil { log.Println(err) } } } // Select 插入数据 func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows { logdb.mutex.Lock() defer logdb.mutex.Unlock() Rows, err := logdb.driver.Query(query, args...) if err != nil { log.Println(err) return nil } return Rows } // ADError 广告错误后更新 func (logdb *LogDB) ADError(uid, error_msg string) { logdb.mutex.Lock() defer logdb.mutex.Unlock() _, err := logdb.driver.Exec("update log_spider set status = 1000, error_msg=? where uid =?;", error_msg, uid) if err != nil { log.Println(err) } }