From d7a6da287d90a5ab3abf379a787d71c14209d9b0 Mon Sep 17 00:00:00 2001 From: eson Date: Mon, 20 Jul 2020 18:54:34 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20extractor=20=E7=9A=84=E9=87=8D=E6=9E=84?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- extractor/openrec_extractor/openrec_extractor.go | 2 +- source_field.go | 13 +++++++++---- sql/intimate_source.sql | 3 ++- store.go | 2 +- tasks/openrec/openrec_task2/task_openrec.go | 8 +++++--- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 1e87fdd..53d2537 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -45,7 +45,7 @@ func (oe *OpenrecExtractor) Execute() { runtime.GC() time.Sleep(time.Nanosecond) - source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) + source, err := store.Pop(string(intimate.TTOpenrecUser), 100) if err != nil { if err != lasterr { log.Println(err, lasterr) diff --git a/source_field.go b/source_field.go index 0e1dc32..1236c8c 100644 --- a/source_field.go +++ b/source_field.go @@ -7,16 +7,21 @@ import ( // Source 的结构体 type Source struct { - Uid int64 // - Url string // - TargetType string // + Uid int64 // + + Url string // + + StreamerId sql.NullInt64 // + Source sql.NullString // PassGob sql.NullString // Ext interface{} // UpdateTime sql.NullTime // - Operator int32 // ErrorMsg sql.NullString // + TargetType string // + Operator int32 // + LastOperator int32 } diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index 3ed3ffa..5a0b122 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -3,7 +3,8 @@ use intimate_source; CREATE TABLE IF NOT EXISTS `source_openrec` ( uid bigint AUTO_INCREMENT COMMENT '自增UID', - + + `streamer_id` bigint DEFAULT NULL COMMENT 'streamer uid, 关联主播', `url` text NOT NULL COMMENT '获取源数据地址', `source` longtext DEFAULT NULL COMMENT '源数据', `ext` json DEFAULT NULL COMMENT '扩展字段', diff --git a/store.go b/store.go index 1245f90..b69929e 100644 --- a/store.go +++ b/store.go @@ -70,7 +70,7 @@ func (store *StoreSource) errorAlarm(err error) { // Insert 插入数据 func (store *StoreSource) Insert(isource IGet) { - _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg")) + _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) if err != nil { panic(err) } diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index faca2c0..65cb95f 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -143,8 +143,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) ext := make(map[string]interface{}) - ext["supporters"] = supporters - ext["user"] = string(resp.Content()) + ext["html_supporters"] = supporters + ext["html_user"] = string(resp.Content()) liveUrl := updateUrl["live"] tp = cxt.Session().Get(liveUrl) @@ -154,7 +154,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { estore.UpdateError(streamer, err) continue } - ext["live"] = string(resp.Content()) + ext["html_live"] = string(resp.Content()) + ext["var_user_id"] = userId extJsonBytes, err := json.Marshal(ext) if err != nil { @@ -168,6 +169,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { source := &intimate.Source{} source.TargetType = string(intimate.TTOpenrecUser) source.Ext = string(extJsonBytes) + source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) estore.UpdateOperator(streamer)