diff --git a/extractor/openrec/main.go b/extractor/openrec/main.go index 0639189..86abea9 100644 --- a/extractor/openrec/main.go +++ b/extractor/openrec/main.go @@ -6,7 +6,7 @@ package main `anchor_id` varchar(255) NOT NULL, `anchor_name` varchar(255) NOT NULL, `live_url` text, - `channel` varchar(128) DEFAULT NULL, + `channel` varchar(128) DEFAULT NULL, // 没有分类 `show_type` varchar(255) DEFAULT NULL, */ diff --git a/extractor/openrec/openrec_test.go b/extractor/openrec/openrec_test.go index f930b91..76585ab 100644 --- a/extractor/openrec/openrec_test.go +++ b/extractor/openrec/openrec_test.go @@ -2,13 +2,13 @@ package main import ( "intimate" - "os" "testing" "github.com/tidwall/gjson" ) func TestExtractor(t *testing.T) { + collect := intimate.NewExtractorStore() store := intimate.NewSourceStore("source_openrec") source, err := store.Pop("openrec_user", 100) if source != nil { @@ -24,11 +24,9 @@ func TestExtractor(t *testing.T) { m := result.Map() for key := range m { t.Error(key) - f, err := os.OpenFile("./openrec_"+key+".html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) - if err != nil { - panic(err) - } - f.WriteString(m[key].String()) + ai := &intimate.CollectLog{} + ai.SetAnchorId("123") + collect.InsertCollectLog(ai) } } else { t.Error("data is not json:\n", string(sdata)) diff --git a/extractor_field.go b/extractor_field.go index 2d15f03..9aa9cbd 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -168,7 +168,7 @@ type IGetCollectLog interface { GetShowEndTime() sql.NullTime // GetUpdateTime() time.Time // GetExt() interface{} // - GetError() sql.NullString // + GetErrorMsg() sql.NullString // } type ISetCollectLog interface { @@ -186,7 +186,7 @@ type ISetCollectLog interface { SetShowEndTime(sql.NullTime) // SetUpdateTime(time.Time) // SetExt(interface{}) // - SetError(sql.NullString) // + SetErrorMsg(sql.NullString) // } /* @@ -238,17 +238,17 @@ type CollectLog struct { ShowEndTime sql.NullTime // UpdateTime time.Time // Ext interface{} // - Error sql.NullString // + ErrorMsg sql.NullString // } -// GetError Get return Error sql.NullString -func (cl *CollectLog) GetError() sql.NullString { - return cl.Error +// GetErrorMsg Get return Error sql.NullString +func (cl *CollectLog) GetErrorMsg() sql.NullString { + return cl.ErrorMsg } -// SetError Set Error sql.NullString -func (cl *CollectLog) SetError(Error sql.NullString) { - cl.Error = Error +// SetErrorMsg Set Error sql.NullString +func (cl *CollectLog) SetErrorMsg(ErrorMsg sql.NullString) { + cl.ErrorMsg = ErrorMsg } // GetExt Get return Ext interface{} diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index 5f5728a..8979744 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS `collect_log` ( `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `ext` json DEFAULT NULL, - `error` text DEFAULT NULL, + `error_msg` text DEFAULT NULL, KEY `uid_idx` (`uid`), KEY `platform_idx` (`platform`), diff --git a/store.go b/store.go index 9d15378..5947247 100644 --- a/store.go +++ b/store.go @@ -141,7 +141,7 @@ type ExtractorStore struct { func (store *ExtractorStore) errorAlarm(err error) { if err != nil { - log.Println("store error: ", err) + log.Panic("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { @@ -154,6 +154,14 @@ func (store *ExtractorStore) errorAlarm(err error) { } } +func NewExtractorStore() *ExtractorStore { + db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) + if err != nil { + panic(err) + } + return &ExtractorStore{db: db} +} + /* `uid` bigint, `platform` varchar(255) NOT NULL, @@ -189,11 +197,13 @@ func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) { `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, --时间戳从源数据里获取 `ext` json DEFAULT NULL, - `error` text DEFAULT NULL, + `error_msg` text DEFAULT NULL, */ // InsertCollectLog CollectLog表插入数据 -func (store *ExtractorStore) InsertCollectLog(isource IGetAnchorInfo) { - _, err := store.db.Exec("insert into "+CollectLogTable+"(platform, anchor_id, anchor_name, live_url, channel, show_type, ext) values(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetShowType(), isource.GetExt()) +func (store *ExtractorStore) InsertCollectLog(isource IGetCollectLog) { + _, err := store.db.Exec("insert into "+CollectLogTable+"(uid, platform, anchor_id, is_showing, is_error, followers, views, giver, gratuity, show_title, show_start_time, show_end_time, update_time, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + isource.GetUid(), isource.GetPlatform(), isource.GetAnchorId(), isource.GetIsShowing(), isource.GetIsError(), isource.GetFollowers(), isource.GetViews(), isource.GetGiver(), isource.GetGratuity(), isource.GetShowTitle(), isource.GetShowStartTime(), isource.GetShowEndTime(), isource.GetUpdateTime(), isource.GetExt(), isource.GetErrorMsg(), + ) store.errorAlarm(err) }