for save.
This commit is contained in:
parent
d5151f92bf
commit
d258cc51d3
|
@ -6,7 +6,7 @@ package main
|
|||
`anchor_id` varchar(255) NOT NULL,
|
||||
`anchor_name` varchar(255) NOT NULL,
|
||||
`live_url` text,
|
||||
`channel` varchar(128) DEFAULT NULL,
|
||||
`channel` varchar(128) DEFAULT NULL, // 没有分类
|
||||
`show_type` varchar(255) DEFAULT NULL,
|
||||
*/
|
||||
|
||||
|
|
|
@ -2,13 +2,13 @@ package main
|
|||
|
||||
import (
|
||||
"intimate"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func TestExtractor(t *testing.T) {
|
||||
collect := intimate.NewExtractorStore()
|
||||
store := intimate.NewSourceStore("source_openrec")
|
||||
source, err := store.Pop("openrec_user", 100)
|
||||
if source != nil {
|
||||
|
@ -24,11 +24,9 @@ func TestExtractor(t *testing.T) {
|
|||
m := result.Map()
|
||||
for key := range m {
|
||||
t.Error(key)
|
||||
f, err := os.OpenFile("./openrec_"+key+".html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f.WriteString(m[key].String())
|
||||
ai := &intimate.CollectLog{}
|
||||
ai.SetAnchorId("123")
|
||||
collect.InsertCollectLog(ai)
|
||||
}
|
||||
} else {
|
||||
t.Error("data is not json:\n", string(sdata))
|
||||
|
|
|
@ -168,7 +168,7 @@ type IGetCollectLog interface {
|
|||
GetShowEndTime() sql.NullTime //
|
||||
GetUpdateTime() time.Time //
|
||||
GetExt() interface{} //
|
||||
GetError() sql.NullString //
|
||||
GetErrorMsg() sql.NullString //
|
||||
}
|
||||
|
||||
type ISetCollectLog interface {
|
||||
|
@ -186,7 +186,7 @@ type ISetCollectLog interface {
|
|||
SetShowEndTime(sql.NullTime) //
|
||||
SetUpdateTime(time.Time) //
|
||||
SetExt(interface{}) //
|
||||
SetError(sql.NullString) //
|
||||
SetErrorMsg(sql.NullString) //
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -238,17 +238,17 @@ type CollectLog struct {
|
|||
ShowEndTime sql.NullTime //
|
||||
UpdateTime time.Time //
|
||||
Ext interface{} //
|
||||
Error sql.NullString //
|
||||
ErrorMsg sql.NullString //
|
||||
}
|
||||
|
||||
// GetError Get return Error sql.NullString
|
||||
func (cl *CollectLog) GetError() sql.NullString {
|
||||
return cl.Error
|
||||
// GetErrorMsg Get return Error sql.NullString
|
||||
func (cl *CollectLog) GetErrorMsg() sql.NullString {
|
||||
return cl.ErrorMsg
|
||||
}
|
||||
|
||||
// SetError Set Error sql.NullString
|
||||
func (cl *CollectLog) SetError(Error sql.NullString) {
|
||||
cl.Error = Error
|
||||
// SetErrorMsg Set Error sql.NullString
|
||||
func (cl *CollectLog) SetErrorMsg(ErrorMsg sql.NullString) {
|
||||
cl.ErrorMsg = ErrorMsg
|
||||
}
|
||||
|
||||
// GetExt Get return Ext interface{}
|
||||
|
|
|
@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS `collect_log` (
|
|||
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
`ext` json DEFAULT NULL,
|
||||
|
||||
`error` text DEFAULT NULL,
|
||||
`error_msg` text DEFAULT NULL,
|
||||
|
||||
KEY `uid_idx` (`uid`),
|
||||
KEY `platform_idx` (`platform`),
|
||||
|
|
18
store.go
18
store.go
|
@ -141,7 +141,7 @@ type ExtractorStore struct {
|
|||
|
||||
func (store *ExtractorStore) errorAlarm(err error) {
|
||||
if err != nil {
|
||||
log.Println("store error: ", err)
|
||||
log.Panic("store error: ", err)
|
||||
// 报警. 如果数据插入有问题
|
||||
store.errorCount++
|
||||
if store.errorCount >= store.errorLimit {
|
||||
|
@ -154,6 +154,14 @@ func (store *ExtractorStore) errorAlarm(err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func NewExtractorStore() *ExtractorStore {
|
||||
db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &ExtractorStore{db: db}
|
||||
}
|
||||
|
||||
/*
|
||||
`uid` bigint,
|
||||
`platform` varchar(255) NOT NULL,
|
||||
|
@ -189,11 +197,13 @@ func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) {
|
|||
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, --时间戳从源数据里获取
|
||||
`ext` json DEFAULT NULL,
|
||||
|
||||
`error` text DEFAULT NULL,
|
||||
`error_msg` text DEFAULT NULL,
|
||||
*/
|
||||
|
||||
// InsertCollectLog CollectLog表插入数据
|
||||
func (store *ExtractorStore) InsertCollectLog(isource IGetAnchorInfo) {
|
||||
_, err := store.db.Exec("insert into "+CollectLogTable+"(platform, anchor_id, anchor_name, live_url, channel, show_type, ext) values(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetShowType(), isource.GetExt())
|
||||
func (store *ExtractorStore) InsertCollectLog(isource IGetCollectLog) {
|
||||
_, err := store.db.Exec("insert into "+CollectLogTable+"(uid, platform, anchor_id, is_showing, is_error, followers, views, giver, gratuity, show_title, show_start_time, show_end_time, update_time, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
||||
isource.GetUid(), isource.GetPlatform(), isource.GetAnchorId(), isource.GetIsShowing(), isource.GetIsError(), isource.GetFollowers(), isource.GetViews(), isource.GetGiver(), isource.GetGratuity(), isource.GetShowTitle(), isource.GetShowStartTime(), isource.GetShowEndTime(), isource.GetUpdateTime(), isource.GetExt(), isource.GetErrorMsg(),
|
||||
)
|
||||
store.errorAlarm(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user