logdb/logdb.go

221 lines
5.6 KiB
Go
Raw Normal View History

2018-12-10 06:06:37 +00:00
package logdb
import (
"database/sql"
"encoding/json"
2018-12-10 06:06:37 +00:00
"fmt"
"io/ioutil"
"log"
2018-12-18 16:47:37 +00:00
"os"
2018-12-19 02:44:38 +00:00
"sync"
2018-12-10 08:52:46 +00:00
"time"
2018-12-10 06:06:37 +00:00
2018-12-19 08:57:51 +00:00
_ "github.com/go-sql-driver/mysql" // mysql驱动
2018-12-10 06:06:37 +00:00
yaml "gopkg.in/yaml.v2"
)
// LogDB 属性结构
type LogDB struct {
Charset string `yaml:"charset"`
DB string `yaml:"db"`
Hosts []string `yaml:"hosts"`
Password string `yaml:"password"`
Port string `yaml:"port"`
User string `yaml:"user"`
2018-12-18 16:47:37 +00:00
pid int
2018-12-18 16:45:00 +00:00
hostid int
nextCheck int64
checkLimit int64
2018-12-19 02:44:38 +00:00
driver *sql.DB
mutex sync.Mutex
2018-12-10 06:06:37 +00:00
}
2018-12-10 09:33:52 +00:00
// New 创建一个logdb的配置
func New(filename string) *LogDB {
2018-12-10 06:06:37 +00:00
logdb := LogDB{}
2018-12-18 16:45:00 +00:00
logdb.checkLimit = 300
2018-12-18 16:47:37 +00:00
logdb.pid = os.Getpid()
2018-12-18 16:45:00 +00:00
2018-12-10 06:06:37 +00:00
data, err := ioutil.ReadFile(filename)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(data, &logdb)
if err != nil {
panic(err)
}
logdb.hostid = 0
logdb.Connect()
return &logdb
}
2018-12-10 06:10:24 +00:00
// Ping 是否Ping通数据库
2018-12-10 06:40:38 +00:00
func (logdb *LogDB) Ping() (result bool) {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-10 06:06:37 +00:00
defer func() {
if err := recover(); err != nil {
2018-12-10 06:40:38 +00:00
result = false
log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ")
2018-12-10 06:06:37 +00:00
hostlen := len(logdb.Hosts)
for i := 0; i < hostlen; i++ {
2018-12-10 06:58:29 +00:00
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[i], logdb.Port, logdb.DB, logdb.Charset))
2018-12-10 06:06:37 +00:00
if err != nil {
2018-12-10 06:40:38 +00:00
log.Println(err, logdb.Hosts[i], " is connect fail")
continue
}
if err := db.Ping(); err != nil {
log.Println(err, logdb.Hosts[i], " is connect fail")
2018-12-10 06:06:37 +00:00
continue
}
2018-12-10 06:40:38 +00:00
2018-12-10 06:06:37 +00:00
logdb.driver = db
logdb.hostid = i
2018-12-10 06:40:38 +00:00
result = true
2018-12-10 06:06:37 +00:00
}
}
}()
2018-12-10 06:40:38 +00:00
if err := logdb.driver.Ping(); err != nil {
panic(err)
}
return true
}
// Connect 重连
func (logdb *LogDB) Connect() {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-10 06:40:38 +00:00
logdb.hostid++
if logdb.hostid >= len(logdb.Hosts) {
logdb.hostid = 0
}
2018-12-10 06:58:29 +00:00
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset))
2018-12-10 06:06:37 +00:00
if err != nil {
panic(err)
}
2018-12-10 06:40:38 +00:00
if logdb.driver != nil {
logdb.driver.Close()
}
2018-12-10 06:06:37 +00:00
logdb.driver = db
2018-12-10 06:40:38 +00:00
log.Println("connect is", logdb.Ping(), logdb)
2018-12-10 06:06:37 +00:00
}
2018-12-10 08:52:46 +00:00
// ADInsert 插入数据
func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-18 17:07:22 +00:00
_, err := logdb.driver.Exec("insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,?, ?, ?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl)
2018-12-10 08:52:46 +00:00
if err != nil {
2018-12-18 07:04:19 +00:00
log.Println(err)
2018-12-18 17:07:22 +00:00
log.Printf("for save ad sql: insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(%s, %s, %s, %s, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s)\n", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl.Format("2006-01-02 15:04:05"))
2018-12-10 08:52:46 +00:00
}
}
2018-12-18 16:45:00 +00:00
type ADResonse struct {
UID string
Response string
}
2018-12-18 16:47:37 +00:00
func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-18 16:45:00 +00:00
2018-12-18 16:47:37 +00:00
pid := logdb.pid + 2000
2018-12-19 02:44:38 +00:00
logdb.ADCheckRecover(spider_id)
2018-12-18 16:45:00 +00:00
_, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id)
if err != nil {
log.Println(err)
2018-12-18 17:07:22 +00:00
return nil
2018-12-18 16:45:00 +00:00
}
rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid)
if err != nil {
log.Println(err)
2018-12-18 17:07:22 +00:00
return nil
2018-12-18 16:45:00 +00:00
}
var adresponse []ADResonse
var uid, response string
for rows.Next() {
rows.Scan(&uid, &response)
adresponse = append(adresponse, ADResonse{UID: uid, Response: response})
// log.Println(uid, response)
}
return adresponse
}
2018-12-19 02:44:38 +00:00
func (logdb *LogDB) ADCheckRecover(spider_id int) {
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-18 16:45:00 +00:00
now := time.Now()
if now.Unix() > logdb.nextCheck {
logdb.nextCheck = now.Unix() + logdb.checkLimit
tsUpdate := now.Add(-time.Minute * 5)
2018-12-18 17:07:22 +00:00
_, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate)
2018-12-18 16:45:00 +00:00
if err != nil {
log.Println(err)
}
}
}
func (logdb *LogDB) ADParserSuccess(uid string, successData string) {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
ext := make(map[string]string)
ext["success_data"] = successData
data, err := json.Marshal(&ext)
if err != nil || successData == "" {
_, err := logdb.driver.Exec("update log_spider set status = 200 where uid = ?", uid)
if err != nil {
log.Println(err)
}
} else {
_, err := logdb.driver.Exec("update log_spider set status = 200, ext = ? where uid = ?", string(data), uid)
if err != nil {
log.Println(err)
}
2018-12-19 02:44:38 +00:00
}
}
2018-12-10 08:52:46 +00:00
// Select 插入数据
func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-10 08:52:46 +00:00
Rows, err := logdb.driver.Query(query, args...)
if err != nil {
2018-12-18 07:04:19 +00:00
log.Println(err)
2018-12-18 17:07:22 +00:00
return nil
2018-12-10 08:52:46 +00:00
}
return Rows
}
// ADError 广告错误后更新
2018-12-10 09:33:52 +00:00
func (logdb *LogDB) ADError(uid, error_msg string) {
2018-12-19 02:44:38 +00:00
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
2018-12-18 17:07:22 +00:00
_, err := logdb.driver.Exec("update log_spider set status = 1000, error_msg=? where uid =?;", error_msg, uid)
2018-12-10 08:52:46 +00:00
if err != nil {
2018-12-18 07:04:19 +00:00
log.Println(err)
2018-12-10 08:52:46 +00:00
}
}