package logdb import ( "database/sql" "fmt" "io/ioutil" "log" "os" "sync" "time" yaml "gopkg.in/yaml.v2" ) // LogDB 属性结构 type LogDB struct { Charset string `yaml:"charset"` DB string `yaml:"db"` Hosts []string `yaml:"hosts"` Password string `yaml:"password"` Port string `yaml:"port"` User string `yaml:"user"` pid int hostid int nextCheck int64 checkLimit int64 driver *sql.DB mutex sync.Mutex } // New 创建一个logdb的配置 func New(filename string) *LogDB { logdb := LogDB{} logdb.checkLimit = 300 logdb.pid = os.Getpid() data, err := ioutil.ReadFile(filename) if err != nil { panic(err) } err = yaml.Unmarshal(data, &logdb) if err != nil { panic(err) } logdb.hostid = 0 logdb.Connect() return &logdb } // Ping 是否Ping通数据库 func (logdb *LogDB) Ping() (result bool) { logdb.mutex.Lock() defer logdb.mutex.Unlock() defer func() { if err := recover(); err != nil { result = false log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ") hostlen := len(logdb.Hosts) for i := 0; i < hostlen; i++ { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[i], logdb.Port, logdb.DB, logdb.Charset)) if err != nil { log.Println(err, logdb.Hosts[i], " is connect fail") continue } if err := db.Ping(); err != nil { log.Println(err, logdb.Hosts[i], " is connect fail") continue } logdb.driver = db logdb.hostid = i result = true } } }() if err := logdb.driver.Ping(); err != nil { panic(err) } return true } // Connect 重连 func (logdb *LogDB) Connect() { logdb.mutex.Lock() defer logdb.mutex.Unlock() logdb.hostid++ if logdb.hostid >= len(logdb.Hosts) { logdb.hostid = 0 } db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset)) if err != nil { panic(err) } if logdb.driver != nil { logdb.driver.Close() } logdb.driver = db log.Println("connect is", logdb.Ping(), logdb) } // ADInsert 插入数据 func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) { logdb.mutex.Lock() defer logdb.mutex.Unlock() _, err := logdb.driver.Exec("insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,?, ?, ?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl) if err != nil { log.Println(err) log.Printf("for save ad sql: insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(%s, %s, %s, %s, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s)\n", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl.Format("2006-01-02 15:04:05")) } } type ADResonse struct { UID string Response string } func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { logdb.mutex.Lock() defer logdb.mutex.Unlock() pid := logdb.pid + 2000 logdb.ADCheckRecover(spider_id) _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) if err != nil { log.Println(err) return nil } rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid) if err != nil { log.Println(err) return nil } var adresponse []ADResonse var uid, response string for rows.Next() { rows.Scan(&uid, &response) adresponse = append(adresponse, ADResonse{UID: uid, Response: response}) // log.Println(uid, response) } return adresponse } func (logdb *LogDB) ADCheckRecover(spider_id int) { logdb.mutex.Lock() defer logdb.mutex.Unlock() now := time.Now() if now.Unix() > logdb.nextCheck { logdb.nextCheck = now.Unix() + logdb.checkLimit tsUpdate := now.Add(-time.Minute * 5) _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) if err != nil { log.Println(err) } } } func (logdb *LogDB) ADParserSuccess(uid string) { logdb.mutex.Lock() defer logdb.mutex.Unlock() _, err := logdb.driver.Exec("update log_spider set status = 200 where uid = ?", uid) if err != nil { log.Println(err) } } // Select 插入数据 func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows { logdb.mutex.Lock() defer logdb.mutex.Unlock() Rows, err := logdb.driver.Query(query, args...) if err != nil { log.Println(err) return nil } return Rows } // ADError 广告错误后更新 func (logdb *LogDB) ADError(uid, error_msg string) { logdb.mutex.Lock() defer logdb.mutex.Unlock() _, err := logdb.driver.Exec("update log_spider set status = 1000, error_msg=? where uid =?;", error_msg, uid) if err != nil { log.Println(err) } }