v0.3.0版本重构. 以 主播 为目标单位.

This commit is contained in:
eson
2020-07-21 15:05:56 +08:00
parent d7a6da287d
commit 079488a2ba
7 changed files with 80 additions and 94 deletions

View File

@@ -36,8 +36,8 @@ func (oe *OpenrecExtractor) Execute() {
atomic.StoreInt32(&loop, 0)
}()
extractorStore := intimate.NewStoreExtractor()
store := intimate.NewStoreSource("source_openrec")
estore := intimate.NewStoreExtractor()
sstore := intimate.NewStoreSource(string(intimate.STOpenrec))
var lasterr error = nil
for atomic.LoadInt32(&loop) > 0 {
@@ -45,7 +45,7 @@ func (oe *OpenrecExtractor) Execute() {
runtime.GC()
time.Sleep(time.Nanosecond)
source, err := store.Pop(string(intimate.TTOpenrecUser), 100)
source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0)
if err != nil {
if err != lasterr {
log.Println(err, lasterr)
@@ -55,67 +55,52 @@ func (oe *OpenrecExtractor) Execute() {
continue
}
sdata := source.Ext.([]byte)
datamap := gjson.ParseBytes(sdata).Map()
source.Operator = int32(intimate.OperatorError)
userId := source.Source.String
userId := datamap["var_user_id"].String()
streamer := &intimate.Streamer{}
streamer.UserId = userId
streamer.Platform = string(intimate.Popenrec)
sdata := source.Ext.([]byte)
if gjson.ValidBytes(sdata) {
result := gjson.ParseBytes(sdata)
datamap := result.Map()
oe.user = intimate.NewExtractorSource(datamap["html_user"])
oe.user.CreateExtractor()
oe.user = intimate.NewExtractorSource(datamap["user"])
oe.user.CreateExtractor()
oe.userLive = intimate.NewExtractorSource(datamap["html_live"])
oe.userLive.CreateExtractor()
oe.userLive = intimate.NewExtractorSource(datamap["user_live"])
oe.userLive.CreateExtractor()
oe.supporters = intimate.NewExtractorSource(datamap["json_supporters"])
clog := &intimate.CollectLog{}
oe.supporters = intimate.NewExtractorSource(datamap["supporters"])
clog := &intimate.CollectLog{}
// log.Println(anchorId)
// log.Println(anchorId)
oe.extractFollowers(clog)
oe.extractUserName(streamer)
oe.extractViewsAndLiveStreaming(clog)
oe.extractGiversAndGratuity(clog)
oe.extractLive(clog)
oe.extractTags(clog)
oe.extractFollowers(clog)
oe.extractUserName(streamer)
oe.extractViewsAndLiveStreaming(clog)
oe.extractGiversAndGratuity(clog)
oe.extractLive(clog)
oe.extractTags(clog)
streamer.Uid = source.StreamerId.Int64
streamer.UpdateTime = source.UpdateTime
streamer.UpdateTime = source.UpdateTime
clog.Platform = string(intimate.Popenrec)
clog.UserId = userId
clog.UpdateTime = source.UpdateTime
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true}
logUid := estore.InsertCollectLog(clog)
streamUid, err := extractorStore.UpdateStreamer(streamer)
if err != nil {
log.Println(err)
source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
store.UpdateOperator(source)
return
}
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true}
streamer.LatestLogUid = logUid
streamer.Operator = 0
estore.UpdateStreamer(streamer)
clog.StreamerUid = streamUid
clog.Platform = string(intimate.Popenrec)
clog.UserId = userId
clog.UpdateTime = source.UpdateTime
source.Operator = int32(intimate.OperatorExtractorOK)
sstore.UpdateOperator(source)
logUid, err := extractorStore.InsertCollectLog(clog)
if err != nil {
source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
store.UpdateOperator(source)
return
}
extractorStore.UpdateStreamerLog(logUid, streamUid)
source.Operator = int32(intimate.OperatorExtractorOK)
store.UpdateOperator(source)
} else {
log.Println("data is not json:\n", string(sdata))
}
}
}
@@ -138,15 +123,15 @@ func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) {
clog.Set("Followers", sql.NullInt64{Int64: followersInt, Valid: true})
}
func (oe *OpenrecExtractor) extractUserName(ai intimate.ISet) {
func (oe *OpenrecExtractor) extractUserName(streamer intimate.ISet) {
extractor := oe.user.GetExtractor()
xp, err := extractor.XPathResult("//p[@class='c-global__user__profile__list__name__text official-icon--after']/text()")
xp, err := extractor.XPathResult("//p[ contains(@class, 'c-global__user__profile__list__name__text')]/text()")
if err != nil {
log.Println(err)
} else {
if xp.NodeIter().Next() {
userName := xp.String()
ai.Set("UserName", userName)
streamer.Set("UserName", sql.NullString{String: userName, Valid: true})
}
}
}
@@ -167,7 +152,7 @@ func (oe *OpenrecExtractor) extractViewsAndLiveStreaming(clog intimate.ISet) {
}
clog.Set("Views", sql.NullInt64{Int64: int64(viewsint), Valid: true})
clog.Set("IsLiveStreaming", int32(1))
clog.Set("IsLiveStreaming", true)
}
}
@@ -214,7 +199,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) {
if err != nil {
log.Println(err)
}
log.Println(iter.Node().NodeValue(), tm.Local())
// log.Println(iter.Node().NodeValue(), tm.Local())
clog.Set("LiveStartTime", sql.NullTime{Time: tm.Local(), Valid: true})
duration, err := extractor.XPathResult("//meta[@itemprop='duration']/@content")

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/474420502/hunter"
"github.com/lestrrat-go/libxml2"
)
@@ -90,6 +91,28 @@ func TestCase(t *testing.T) {
t.Error(xr)
}
func TestUserName(t *testing.T) {
f, err := os.Open("test.html")
if err != nil {
panic(err)
}
data, err := ioutil.ReadAll(f)
if err != nil {
panic(err)
}
extractor := hunter.NewExtractor(data)
xp, err := extractor.XPathResult("//p[ contains(@class, 'c-global__user__profile__list__name__text')]/text()")
if err != nil {
t.Error(err)
} else {
if xp.NodeIter().Next() {
userName := xp.String()
t.Error(userName)
}
}
}
func TestExtractor(t *testing.T) {
oe := &OpenrecExtractor{}
oe.Execute()