1.重构命名, 简化代码, 加入build.sh

2.重构数据字段结构, 使数据合理性提高.
3.测试通过openrec获取数据完整性.
TODO: 测试时区问题.
This commit is contained in:
eson
2020-07-17 18:21:38 +08:00
parent bb0f3845b7
commit e1040e69cd
15 changed files with 254 additions and 594 deletions

View File

@@ -1,5 +1,10 @@
package main
import (
"net/http"
_ "net/http/pprof"
)
/*
`uid` varchar(36) NOT NULL,
`platform` varchar(255) NOT NULL,
@@ -11,6 +16,11 @@ package main
*/
func main() {
go func() {
http.ListenAndServe("0.0.0.0:8899", nil)
}()
oe := &OpenrecExtractor{}
oe.Execute()
}

View File

@@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"regexp"
"runtime"
"strconv"
"strings"
"sync/atomic"
@@ -35,12 +36,15 @@ func (oe *OpenrecExtractor) Execute() {
atomic.StoreInt32(&loop, 0)
}()
collect := intimate.NewExtractorStore()
extractorStore := intimate.NewExtractorStore()
store := intimate.NewSourceStore("source_openrec")
var lasterr error = nil
for atomic.LoadInt32(&loop) > 0 {
runtime.GC()
time.Sleep(time.Nanosecond)
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
if err != nil {
if err != lasterr {
@@ -51,14 +55,14 @@ func (oe *OpenrecExtractor) Execute() {
continue
}
source.SetOperator(int32(intimate.OperatorError))
anchorId := source.GetSource().String
source.Operator = int32(intimate.OperatorError)
userId := source.Source.String
ai := &intimate.AnchorInfo{}
ai.SetAnchorId(anchorId)
ai.SetPlatform(string(intimate.Popenrec))
streamer := &intimate.Streamer{}
streamer.UserId = userId
streamer.Platform = string(intimate.Popenrec)
sdata := source.GetExt().([]byte)
sdata := source.Ext.([]byte)
if gjson.ValidBytes(sdata) {
result := gjson.ParseBytes(sdata)
datamap := result.Map()
@@ -70,43 +74,44 @@ func (oe *OpenrecExtractor) Execute() {
oe.userLive.CreateExtractor()
oe.supporters = intimate.NewExtractorSource(datamap["supporters"])
clog := &intimate.CollectLog{}
log.Println(anchorId)
// log.Println(anchorId)
oe.extractFollowers(clog)
oe.extractAnchorName(ai)
oe.extractAnchorName(streamer)
oe.extractViewsAndLiveStreaming(clog)
oe.extractGiversAndGratuity(clog)
oe.extractLive(clog)
oe.extractTags(clog)
ai.Set("UpdateTime", source.GetUpdateTime())
streamer.UpdateTime = source.UpdateTime
LiveUrl := "https://www.openrec.tv/live/" + anchorId
ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true})
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true}
Uid, err := collect.InsertAnchorInfo(ai)
streamUid, err := extractorStore.InsertStreamer(streamer)
if err != nil {
log.Println(err)
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
store.UpdateOperator(source)
return
}
clog.Set("Uid", Uid)
clog.Set("Platform", string(intimate.Popenrec))
clog.Set("AnchorId", anchorId)
clog.Set("UpdateTime", source.GetUpdateTime())
clog.StreamerUid = streamUid
clog.Platform = string(intimate.Popenrec)
clog.UserId = userId
clog.UpdateTime = source.UpdateTime
if err = collect.InsertCollectLog(clog); err != nil {
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
logUid, err := extractorStore.InsertCollectLog(clog)
if err != nil {
source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
store.UpdateOperator(source)
return
}
source.SetOperator(int32(intimate.OperatorExtractorOK))
extractorStore.UpdateStreamerLogUid(logUid, streamUid)
source.Operator = int32(intimate.OperatorExtractorOK)
store.UpdateOperator(source)
} else {
log.Println("data is not json:\n", string(sdata))
@@ -136,11 +141,13 @@ func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) {
func (oe *OpenrecExtractor) extractAnchorName(ai intimate.ISet) {
extractor := oe.user.GetExtractor()
xp, err := extractor.XPathResult("//p[@class='c-global__user__profile__list__name__text official-icon--after']/text()")
if xp.NodeIter().Next() {
anchorName := xp.String()
ai.Set("AnchorName", anchorName)
} else {
if err != nil {
log.Println(err)
} else {
if xp.NodeIter().Next() {
userName := xp.String()
ai.Set("UserName", userName)
}
}
}
@@ -234,7 +241,6 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) {
for _, m := range matheslist {
tags = append(tags, m[1])
}
log.Println(tags)
tagsBytes, err := json.Marshal(tags)
if err != nil {
log.Println(err)