Compare commits

..

No commits in common. "master" and "v1.0.0" have entirely different histories.

View File

@ -10,8 +10,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/satori/go.uuid"
_ "github.com/go-sql-driver/mysql" // mysql驱动 _ "github.com/go-sql-driver/mysql" // mysql驱动
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
) )
@ -126,22 +124,17 @@ type ADResonse struct {
Response string Response string
} }
// ADParserSelect adpase 根据自己的spider_id, 选择selcount条数据进行处理. 10- 100条最佳 func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse {
func (logdb *LogDB) ADParserSelect(spider_id int, selcount int) []ADResonse { pid := logdb.pid + 2000
puid, err := uuid.NewV4() logdb.adCheckRecover(spider_id)
if err != nil {
panic(err)
}
logdb.adCheckRecover(spider_id, 5*time.Minute) _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id)
_, err = logdb.driver.Exec("update log_spider set status = 10000, parse_id = ? where spider_id = ? and status = 0 limit ?", puid.String(), spider_id, selcount)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return nil return nil
} }
rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and parse_id = ? and status = 10000", spider_id, puid.String()) rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return nil return nil
@ -159,23 +152,19 @@ func (logdb *LogDB) ADParserSelect(spider_id int, selcount int) []ADResonse {
return adresponse return adresponse
} }
// adCheckRecover 处理恢复错误, 或者没处理完的Select 出来的数据, 5分钟以上最佳. 例子: intervalTime := time.Minute * 5 func (logdb *LogDB) adCheckRecover(spider_id int) {
// spider_id 对应 spider_id
// intervalTime 每隔多少时间去检查一次
func (logdb *LogDB) adCheckRecover(spider_id int, intervalTime time.Duration) {
now := time.Now() now := time.Now()
if now.Unix() > logdb.nextCheck { if now.Unix() > logdb.nextCheck {
logdb.nextCheck = now.Unix() + logdb.checkLimit logdb.nextCheck = now.Unix() + logdb.checkLimit
tsUpdate := now.Add(-intervalTime) // tsUpdate := now.Add(-time.Minute * 5) tsUpdate := now.Add(-time.Minute * 5)
_, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status = 10000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
} }
} }
// ADParserSuccess 解析成功后处理该条数据
func (logdb *LogDB) ADParserSuccess(uid string, successData string) { func (logdb *LogDB) ADParserSuccess(uid string, successData string) {
logdb.mutex.Lock() logdb.mutex.Lock()
defer logdb.mutex.Unlock() defer logdb.mutex.Unlock()