TODO: extractor 的重构.

This commit is contained in:
eson 2020-07-20 18:54:34 +08:00
parent f0f83a9f00
commit d7a6da287d
5 changed files with 18 additions and 10 deletions

View File

@ -45,7 +45,7 @@ func (oe *OpenrecExtractor) Execute() {
runtime.GC()
time.Sleep(time.Nanosecond)
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
source, err := store.Pop(string(intimate.TTOpenrecUser), 100)
if err != nil {
if err != lasterr {
log.Println(err, lasterr)

View File

@ -7,16 +7,21 @@ import (
// Source 的结构体
type Source struct {
Uid int64 //
Url string //
TargetType string //
Uid int64 //
Url string //
StreamerId sql.NullInt64 //
Source sql.NullString //
PassGob sql.NullString //
Ext interface{} //
UpdateTime sql.NullTime //
Operator int32 //
ErrorMsg sql.NullString //
TargetType string //
Operator int32 //
LastOperator int32
}

View File

@ -3,7 +3,8 @@ use intimate_source;
CREATE TABLE IF NOT EXISTS `source_openrec` (
uid bigint AUTO_INCREMENT COMMENT '自增UID',
`streamer_id` bigint DEFAULT NULL COMMENT 'streamer uid, 关联主播',
`url` text NOT NULL COMMENT '获取源数据地址',
`source` longtext DEFAULT NULL COMMENT '源数据',
`ext` json DEFAULT NULL COMMENT '扩展字段',

View File

@ -70,7 +70,7 @@ func (store *StoreSource) errorAlarm(err error) {
// Insert 插入数据
func (store *StoreSource) Insert(isource IGet) {
_, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"))
_, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId"))
if err != nil {
panic(err)
}

View File

@ -143,8 +143,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
// cookies := cxt.Session().GetCookies(wf.GetParsedURL())
ext := make(map[string]interface{})
ext["supporters"] = supporters
ext["user"] = string(resp.Content())
ext["html_supporters"] = supporters
ext["html_user"] = string(resp.Content())
liveUrl := updateUrl["live"]
tp = cxt.Session().Get(liveUrl)
@ -154,7 +154,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
estore.UpdateError(streamer, err)
continue
}
ext["live"] = string(resp.Content())
ext["html_live"] = string(resp.Content())
ext["var_user_id"] = userId
extJsonBytes, err := json.Marshal(ext)
if err != nil {
@ -168,6 +169,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
source := &intimate.Source{}
source.TargetType = string(intimate.TTOpenrecUser)
source.Ext = string(extJsonBytes)
source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true}
sstore.Insert(source)
estore.UpdateOperator(streamer)