logdb/logdb.go
2018-12-18 14:13:52 +08:00

118 lines
2.9 KiB
Go

package logdb
import (
"database/sql"
"fmt"
"io/ioutil"
"log"
"time"
yaml "gopkg.in/yaml.v2"
)
// LogDB 属性结构
type LogDB struct {
Charset string `yaml:"charset"`
DB string `yaml:"db"`
Hosts []string `yaml:"hosts"`
Password string `yaml:"password"`
Port string `yaml:"port"`
User string `yaml:"user"`
hostid int
driver *sql.DB
}
// New 创建一个logdb的配置
func New(filename string) *LogDB {
logdb := LogDB{}
data, err := ioutil.ReadFile(filename)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(data, &logdb)
if err != nil {
panic(err)
}
logdb.hostid = 0
logdb.Connect()
return &logdb
}
// Ping 是否Ping通数据库
func (logdb *LogDB) Ping() (result bool) {
defer func() {
if err := recover(); err != nil {
result = false
log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ")
hostlen := len(logdb.Hosts)
for i := 0; i < hostlen; i++ {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[i], logdb.Port, logdb.DB, logdb.Charset))
if err != nil {
log.Println(err, logdb.Hosts[i], " is connect fail")
continue
}
if err := db.Ping(); err != nil {
log.Println(err, logdb.Hosts[i], " is connect fail")
continue
}
logdb.driver = db
logdb.hostid = i
result = true
}
}
}()
if err := logdb.driver.Ping(); err != nil {
panic(err)
}
return true
}
// Connect 重连
func (logdb *LogDB) Connect() {
logdb.hostid++
if logdb.hostid >= len(logdb.Hosts) {
logdb.hostid = 0
}
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset))
if err != nil {
panic(err)
}
if logdb.driver != nil {
logdb.driver.Close()
}
logdb.driver = db
log.Println("connect is", logdb.Ping(), logdb)
}
// ADInsert 插入数据
func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) {
_, err := logdb.driver.Exec("insert into "+logdb.DB+"(uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, status, priority, ts_crawl) value(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,? ,? ,?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, status, priority, ts_crawl)
if err != nil {
panic(err)
}
}
// Select 插入数据
func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows {
Rows, err := logdb.driver.Query(query, args...)
if err != nil {
panic(err)
}
return Rows
}
// ADError 广告错误后更新
func (logdb *LogDB) ADError(uid, error_msg string) {
_, err := logdb.driver.Exec("update ? set status = status + 1000, error_msg=? where uid =?;", logdb.DB, error_msg, uid)
if err != nil {
panic(err)
}
}