diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 53d2537..b4032af 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -36,8 +36,8 @@ func (oe *OpenrecExtractor) Execute() { atomic.StoreInt32(&loop, 0) }() - extractorStore := intimate.NewStoreExtractor() - store := intimate.NewStoreSource("source_openrec") + estore := intimate.NewStoreExtractor() + sstore := intimate.NewStoreSource(string(intimate.STOpenrec)) var lasterr error = nil for atomic.LoadInt32(&loop) > 0 { @@ -45,7 +45,7 @@ func (oe *OpenrecExtractor) Execute() { runtime.GC() time.Sleep(time.Nanosecond) - source, err := store.Pop(string(intimate.TTOpenrecUser), 100) + source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0) if err != nil { if err != lasterr { log.Println(err, lasterr) @@ -55,67 +55,52 @@ func (oe *OpenrecExtractor) Execute() { continue } + sdata := source.Ext.([]byte) + datamap := gjson.ParseBytes(sdata).Map() + source.Operator = int32(intimate.OperatorError) - userId := source.Source.String + userId := datamap["var_user_id"].String() streamer := &intimate.Streamer{} streamer.UserId = userId streamer.Platform = string(intimate.Popenrec) - sdata := source.Ext.([]byte) - if gjson.ValidBytes(sdata) { - result := gjson.ParseBytes(sdata) - datamap := result.Map() + oe.user = intimate.NewExtractorSource(datamap["html_user"]) + oe.user.CreateExtractor() - oe.user = intimate.NewExtractorSource(datamap["user"]) - oe.user.CreateExtractor() + oe.userLive = intimate.NewExtractorSource(datamap["html_live"]) + oe.userLive.CreateExtractor() - oe.userLive = intimate.NewExtractorSource(datamap["user_live"]) - oe.userLive.CreateExtractor() + oe.supporters = intimate.NewExtractorSource(datamap["json_supporters"]) + clog := &intimate.CollectLog{} - oe.supporters = intimate.NewExtractorSource(datamap["supporters"]) - clog := &intimate.CollectLog{} + // log.Println(anchorId) - // log.Println(anchorId) + oe.extractFollowers(clog) + oe.extractUserName(streamer) + oe.extractViewsAndLiveStreaming(clog) + oe.extractGiversAndGratuity(clog) + oe.extractLive(clog) + oe.extractTags(clog) - oe.extractFollowers(clog) - oe.extractUserName(streamer) - oe.extractViewsAndLiveStreaming(clog) - oe.extractGiversAndGratuity(clog) - oe.extractLive(clog) - oe.extractTags(clog) + streamer.Uid = source.StreamerId.Int64 + streamer.UpdateTime = source.UpdateTime - streamer.UpdateTime = source.UpdateTime + clog.Platform = string(intimate.Popenrec) + clog.UserId = userId + clog.UpdateTime = source.UpdateTime - LiveUrl := "https://www.openrec.tv/live/" + userId - streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} + logUid := estore.InsertCollectLog(clog) - streamUid, err := extractorStore.UpdateStreamer(streamer) - if err != nil { - log.Println(err) - source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} - store.UpdateOperator(source) - return - } + LiveUrl := "https://www.openrec.tv/live/" + userId + streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} + streamer.LatestLogUid = logUid + streamer.Operator = 0 + estore.UpdateStreamer(streamer) - clog.StreamerUid = streamUid - clog.Platform = string(intimate.Popenrec) - clog.UserId = userId - clog.UpdateTime = source.UpdateTime + source.Operator = int32(intimate.OperatorExtractorOK) + sstore.UpdateOperator(source) - logUid, err := extractorStore.InsertCollectLog(clog) - if err != nil { - source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} - store.UpdateOperator(source) - return - } - - extractorStore.UpdateStreamerLog(logUid, streamUid) - source.Operator = int32(intimate.OperatorExtractorOK) - store.UpdateOperator(source) - } else { - log.Println("data is not json:\n", string(sdata)) - } } } @@ -138,15 +123,15 @@ func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { clog.Set("Followers", sql.NullInt64{Int64: followersInt, Valid: true}) } -func (oe *OpenrecExtractor) extractUserName(ai intimate.ISet) { +func (oe *OpenrecExtractor) extractUserName(streamer intimate.ISet) { extractor := oe.user.GetExtractor() - xp, err := extractor.XPathResult("//p[@class='c-global__user__profile__list__name__text official-icon--after']/text()") + xp, err := extractor.XPathResult("//p[ contains(@class, 'c-global__user__profile__list__name__text')]/text()") if err != nil { log.Println(err) } else { if xp.NodeIter().Next() { userName := xp.String() - ai.Set("UserName", userName) + streamer.Set("UserName", sql.NullString{String: userName, Valid: true}) } } } @@ -167,7 +152,7 @@ func (oe *OpenrecExtractor) extractViewsAndLiveStreaming(clog intimate.ISet) { } clog.Set("Views", sql.NullInt64{Int64: int64(viewsint), Valid: true}) - clog.Set("IsLiveStreaming", int32(1)) + clog.Set("IsLiveStreaming", true) } } @@ -214,7 +199,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) { if err != nil { log.Println(err) } - log.Println(iter.Node().NodeValue(), tm.Local()) + // log.Println(iter.Node().NodeValue(), tm.Local()) clog.Set("LiveStartTime", sql.NullTime{Time: tm.Local(), Valid: true}) duration, err := extractor.XPathResult("//meta[@itemprop='duration']/@content") diff --git a/extractor/openrec_extractor/openrec_test.go b/extractor/openrec_extractor/openrec_test.go index 5212f04..2028e34 100644 --- a/extractor/openrec_extractor/openrec_test.go +++ b/extractor/openrec_extractor/openrec_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/474420502/hunter" "github.com/lestrrat-go/libxml2" ) @@ -90,6 +91,28 @@ func TestCase(t *testing.T) { t.Error(xr) } +func TestUserName(t *testing.T) { + f, err := os.Open("test.html") + if err != nil { + panic(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + panic(err) + } + + extractor := hunter.NewExtractor(data) + xp, err := extractor.XPathResult("//p[ contains(@class, 'c-global__user__profile__list__name__text')]/text()") + if err != nil { + t.Error(err) + } else { + if xp.NodeIter().Next() { + userName := xp.String() + t.Error(userName) + } + } +} + func TestExtractor(t *testing.T) { oe := &OpenrecExtractor{} oe.Execute() diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index 1c351cc..f588757 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS `streamer` ( `is_update_streamer` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新streamer的信息. 1为需要,0则否', `is_update_url` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新update_url. 1为需要,0则否', - `update_url` json NOT NULL COMMENT '更新数据的url, 如直播url, profile url等', + `update_url` json DEFAULT NULL COMMENT '更新数据的url, 如直播url, profile url等', `update_interval` int DEFAULT 30 COMMENT '分钟单位, 默认30分钟, 下次更新的时间间隔', `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `latest_log_uid` bigint COMMENT '最新更新的日志表的uid, 方便关联', @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS `streamer` ( `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', PRIMARY KEY (`uid`), - UNIQUE KEY `platform_anchor_id_idx` (`platform`, `user_id`), + UNIQUE KEY `platform_user_id_idx` (`platform`, `user_id`), KEY `platform_idx` (`platform`), KEY `user_id_idx` (`user_id`), KEY `user_name_idx` (`user_name`), diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index 5a0b122..cf49f79 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -8,7 +8,7 @@ CREATE TABLE IF NOT EXISTS `source_openrec` ( `url` text NOT NULL COMMENT '获取源数据地址', `source` longtext DEFAULT NULL COMMENT '源数据', `ext` json DEFAULT NULL COMMENT '扩展字段', - `pass_gob` blob DEFAULT NULL COMMENT '需要给下个任务传递gob 序列花数据, 非必要不用', + `serialize` blob DEFAULT NULL COMMENT '需要给下个任务传递 序列花数据, 非必要不用', `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', `error_msg` text DEFAULT NULL COMMENT '错误信息', diff --git a/store.go b/store.go index b69929e..72a56dc 100644 --- a/store.go +++ b/store.go @@ -3,7 +3,6 @@ package intimate import ( "database/sql" "log" - "os" "time" _ "github.com/go-sql-driver/mysql" @@ -120,7 +119,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e return nil, err } var args = []interface{}{targetType} - selectSQL := `select uid, url, target_type, source, ext, operator, update_time from ` + store.table + ` where target_type = ?` + selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) @@ -147,7 +146,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e s := &Source{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime) + err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) if err != nil { return nil, err } @@ -201,7 +200,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer return nil, err } var args = []interface{}{platform} - selectSQL := `select uid, update_time, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` + selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) @@ -228,7 +227,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer s := &Streamer{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UpdateUrl, &s.IsUpdateStreamer) + err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer) if err != nil { return nil, err } @@ -258,14 +257,12 @@ func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { }() row := tx.QueryRow(selectSQL+` LIMIT 1 FOR UPDATE`, streamer.Get("Platform"), streamer.Get("UserId")) - var isUpdateUrl int + var isUpdateUrl bool if err = row.Scan(&isUpdateUrl); err == nil { - if isUpdateUrl == 1 { + if isUpdateUrl { tx.Exec("UPDATE "+StreamerTable+" SET update_url = ?", streamer.Get("UpdateUrl")) } return true - } else { - log.Println(err) } _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*30)) @@ -304,41 +301,21 @@ func (store *StoreExtractor) UpdateOperator(isource IGet) { // UpdateStreamer Streamer表, 插入数据 func (store *StoreExtractor) UpdateStreamer(isource IGet) { - tx, err := store.db.Begin() - if err != nil { - panic(err) - } - - defer func() { - err = tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - panic(err) - } - os.Exit(0) - } - }() - - _, err = tx.Exec("UPDATE "+StreamerTable+" SET platform = ?, user_id = ?, user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?,update_time = ?, update_url = ?", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("UpdateTime"), isource.Get("UpdateUrl")) + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?, operator = ?, update_time = ? WHERE uid = ?;", + isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("UpdateTime"), isource.Get("Uid")) if err != nil { panic(err) } } // InsertCollectLog CollectLog表插入数据 -func (store *StoreExtractor) InsertCollectLog(isource IGet) (int64, error) { +func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { tx, err := store.db.Begin() defer func() { if err := recover(); err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - os.Exit(0) + tx.Rollback() + log.Panic(err) } }() @@ -362,5 +339,5 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) (int64, error) { if err = tx.Commit(); err != nil { panic(err) } - return result.LastInsertId() + return logUid } diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index c9b6678..acff785 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -72,7 +72,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { content := resp.Content() if len(content) <= 200 { // 末页退出 finishpoint := time.Now() - log.Println("任务结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) + log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) for time.Now().Sub(finishpoint) < time.Minute*60 { time.Sleep(time.Second) if atomic.LoadInt32(&loop) > 0 { @@ -80,6 +80,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { } } + log.Println("获取Ranking UserId启动:", time.Now()) querys := tp.GetQuery() querys.Set("page", strconv.Itoa(1)) tp.SetQuery(querys) @@ -105,7 +106,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { updateUrl := make(map[string]interface{}) - supportersUrl := "https://www.openrec.tv/viewapp/api/v6/supporters?identify_id=sumomo_xqx&month=&Uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683&Token=46598c320408bd69ae3c63298f6f4a3a97354175&Random=AZVXNAAXQVMOSVWNDPIQ&page_number=1 -H 'accept: application/json, text/javascript, */*; q=0.01' -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' -H 'cookie: uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683;' --compressed" + supportersUrl := "curl 'https://www.openrec.tv/viewapp/api/v6/supporters?identify_id=sumomo_xqx&month=&Uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683&Token=46598c320408bd69ae3c63298f6f4a3a97354175&Random=AZVXNAAXQVMOSVWNDPIQ&page_number=1' -H 'accept: application/json, text/javascript, */*; q=0.01' -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' -H 'cookie: uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683;' --compressed" updateUrl["supporters"] = supportersUrl updateUrl["user"] = "https://www.openrec.tv/user/" + userid updateUrl["live"] = "https://www.openrec.tv/live/" + userid diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 65cb95f..74526e0 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -143,7 +143,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) ext := make(map[string]interface{}) - ext["html_supporters"] = supporters + ext["json_supporters"] = supporters ext["html_user"] = string(resp.Content()) liveUrl := updateUrl["live"]