From 2e9a8036455aef045b6a50a49a23245e17ae2564 Mon Sep 17 00:00:00 2001 From: eson Date: Thu, 9 Jul 2020 17:09:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90openrec=E6=BA=90=E7=9A=84?= =?UTF-8?q?=E8=8E=B7=E5=8F=96.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + sql/intimate_extractor.sql | 2 +- tasks/openrec/openrec_task1/task_openrec.go | 29 +++---- tasks/openrec/openrec_task2/task_openrec.go | 94 +++++++++++---------- testfile/openrec_user.json | 0 5 files changed, 66 insertions(+), 60 deletions(-) create mode 100644 testfile/openrec_user.json diff --git a/.gitignore b/.gitignore index 1920979..e602687 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ screenlog.* *.7z intimate *.gz +debug.test diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index a114820..f13534c 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS `anchor_info` ( `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, `anchor_name` varchar(255) NOT NULL, - `platform_url` text NOT NULL, + `live_url` text, `channel` varchar(128) DEFAULT NULL, `show_type` varchar(255) DEFAULT NULL, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index fa6ec20..36cffba 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -8,9 +8,11 @@ import ( "time" "github.com/474420502/hunter" + "github.com/tidwall/gjson" ) var targetTypeRanking = "openrec_ranking" +var targetTypeUser = "openrec_user" var openrecRanking *OpenrecRanking // store 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql @@ -44,7 +46,6 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { for { - errorMsg := sql.NullString{Valid: false} resp, err := cxt.Hunt() if err != nil { log.Println(err) @@ -53,36 +54,34 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { wf := cxt.Workflow() - data := &intimate.Source{} content := resp.Content() if len(content) <= 200 { return } - data.SetSource(sql.NullString{String: string(content), Valid: len(content) > 0}) - data.SetUrl(wf.GetRawURL()) - data.SetTargetType(targetTypeRanking) + result := gjson.ParseBytes(content) + if result.IsArray() { + for _, User := range result.Array() { + data := &intimate.Source{} + userid := User.Get("channel.id").String() + + data.SetSource(sql.NullString{String: userid, Valid: len(userid) > 0}) + data.SetUrl(wf.GetRawURL()) + data.SetTargetType(targetTypeUser) + store.Insert(data) + } + } querys := wf.GetQuery() page, err := strconv.Atoi(querys.Get("page")) if err != nil { log.Println(err) - errorMsg.String = err.Error() - errorMsg.Valid = true - - data.SetErrorMsg(errorMsg) - data.SetOperator(10000) - store.Insert(data) return } page++ querys.Set("page", strconv.Itoa(page)) wf.SetQuery(querys) - - data.SetErrorMsg(errorMsg) - store.Insert(data) - time.Sleep(time.Second * 2) } } diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index a8a372c..b74f62b 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -8,7 +8,6 @@ import ( "time" "github.com/474420502/hunter" - "github.com/tidwall/gjson" ) var targetTypeUser = "openrec_user" @@ -32,7 +31,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for { - source, err := store.Pop(targetTypeRanking) + source, err := store.Pop(targetTypeUser) if err != nil { log.Println(err) return @@ -42,52 +41,59 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { return } - result := gjson.Parse(source.GetSource().String) - if result.IsArray() { - for _, User := range result.Array() { - userid := User.Get("channel.id").String() - ext := make(map[string]interface{}) + userSource := &intimate.Source{} + userid := source.GetSource().String + userUrl := "https://www.openrec.tv/user/" + userid + userSource.SetUrl(userUrl) - wf := cxt.Session().Get("https://www.openrec.tv/user/" + userid) - resp, err := wf.Execute() - source.SetUpdateTime(time.Now()) + wf := cxt.Session().Get(userUrl) + resp, err := wf.Execute() + source.SetUpdateTime(time.Now()) - if err != nil { - log.Println(err) + if err != nil { + log.Println(err) - source.SetOperator(int32(intimate.OperatorError)) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - continue - } - - ext["user"] = string(resp.Content()) - - wf = cxt.Session().Get("https://www.openrec.tv/user/" + userid + "/supporters") - resp, err = wf.Execute() - if err != nil { - log.Println(err) - source.SetOperator(int32(intimate.OperatorError)) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - continue - } - ext["user_supporters"] = string(resp.Content()) - - extJsonBytes, err := json.Marshal(ext) - if err != nil { - log.Println(err) - source.SetOperator(int32(intimate.OperatorError)) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - continue - } - - source.SetOperator(int32(intimate.OperatorOK)) - source.SetExt(string(extJsonBytes)) - - store.Update(source) - } - } else { - log.Println("array error:", result.Str) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue } + ext := make(map[string]interface{}) + + ext["user"] = string(resp.Content()) + + wf = cxt.Session().Get(userUrl + "/supporters") + resp, err = wf.Execute() + if err != nil { + log.Println(err) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + ext["user_supporters"] = string(resp.Content()) + + wf = cxt.Session().Get("https://www.openrec.tv/live/" + userid) + resp, err = wf.Execute() + if err != nil { + log.Println(err) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + ext["user_live"] = string(resp.Content()) + + extJsonBytes, err := json.Marshal(ext) + if err != nil { + log.Println(err) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + + source.SetOperator(int32(intimate.OperatorOK)) + source.SetExt(string(extJsonBytes)) + store.Update(source) + } + } diff --git a/testfile/openrec_user.json b/testfile/openrec_user.json new file mode 100644 index 0000000..e69de29