diff --git a/logdb.go b/logdb.go index 86d6091..716c18f 100644 --- a/logdb.go +++ b/logdb.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "os" + "sync" "time" yaml "gopkg.in/yaml.v2" @@ -24,7 +25,9 @@ type LogDB struct { hostid int nextCheck int64 checkLimit int64 - driver *sql.DB + + driver *sql.DB + mutex sync.Mutex } // New 创建一个logdb的配置 @@ -49,6 +52,9 @@ func New(filename string) *LogDB { // Ping 是否Ping通数据库 func (logdb *LogDB) Ping() (result bool) { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + defer func() { if err := recover(); err != nil { result = false @@ -83,6 +89,9 @@ func (logdb *LogDB) Ping() (result bool) { // Connect 重连 func (logdb *LogDB) Connect() { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + logdb.hostid++ if logdb.hostid >= len(logdb.Hosts) { logdb.hostid = 0 @@ -100,6 +109,9 @@ func (logdb *LogDB) Connect() { // ADInsert 插入数据 func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + _, err := logdb.driver.Exec("insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,?, ?, ?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl) if err != nil { log.Println(err) @@ -113,9 +125,11 @@ type ADResonse struct { } func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() pid := logdb.pid + 2000 - logdb.ADCheckError(spider_id) + logdb.ADCheckRecover(spider_id) _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) if err != nil { @@ -141,7 +155,10 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { return adresponse } -func (logdb *LogDB) ADCheckError(spider_id int) { +func (logdb *LogDB) ADCheckRecover(spider_id int) { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + now := time.Now() if now.Unix() > logdb.nextCheck { logdb.nextCheck = now.Unix() + logdb.checkLimit @@ -154,8 +171,21 @@ func (logdb *LogDB) ADCheckError(spider_id int) { } } +func (logdb *LogDB) ADParserSuccess(uid string) { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + + _, err := logdb.driver.Exec("update log_spider set status = 200 where uid = ?", uid) + if err != nil { + log.Println(err) + } +} + // Select 插入数据 func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + Rows, err := logdb.driver.Query(query, args...) if err != nil { log.Println(err) @@ -166,6 +196,9 @@ func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows { // ADError 广告错误后更新 func (logdb *LogDB) ADError(uid, error_msg string) { + logdb.mutex.Lock() + defer logdb.mutex.Unlock() + _, err := logdb.driver.Exec("update log_spider set status = 1000, error_msg=? where uid =?;", error_msg, uid) if err != nil { log.Println(err)