From 6d688b845044eee2e2fa0214c7532aa2eea08a2d Mon Sep 17 00:00:00 2001 From: eson Date: Wed, 22 Jul 2020 20:00:02 +0800 Subject: [PATCH] fix: libxml2 leak --- extractor/openrec_extractor/main.go | 1 - .../openrec_extractor/openrec_extractor.go | 46 +++++++++++++++---- extractor_field.go | 22 ++++++--- go.mod | 3 +- go.sum | 4 +- store.go | 26 ++++++++++- tasks/twitch/twitch_task1/task_twitch.go | 1 + tasks/twitch/twitch_task1/task_twitch_test.go | 31 +++++++++++++ 8 files changed, 112 insertions(+), 22 deletions(-) create mode 100644 tasks/twitch/twitch_task1/task_twitch.go create mode 100644 tasks/twitch/twitch_task1/task_twitch_test.go diff --git a/extractor/openrec_extractor/main.go b/extractor/openrec_extractor/main.go index 736ccfb..2fb7715 100644 --- a/extractor/openrec_extractor/main.go +++ b/extractor/openrec_extractor/main.go @@ -16,7 +16,6 @@ import ( */ func main() { - go func() { http.ListenAndServe("0.0.0.0:8899", nil) }() diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index b4032af..0e0f738 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "regexp" - "runtime" "strconv" "strings" "sync/atomic" @@ -18,6 +17,9 @@ import ( "github.com/tidwall/gjson" ) +var estore = intimate.NewStoreExtractor() +var sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) + // OpenrecExtractor 提取方法 type OpenrecExtractor struct { user *intimate.ExtractorSource @@ -36,14 +38,27 @@ func (oe *OpenrecExtractor) Execute() { atomic.StoreInt32(&loop, 0) }() - estore := intimate.NewStoreExtractor() - sstore := intimate.NewStoreSource(string(intimate.STOpenrec)) var lasterr error = nil + execute := func() bool { + var err error - for atomic.LoadInt32(&loop) > 0 { + // if sstore.PopCount() >= 1000 { + // if err = estore.Close(); err != nil { + // log.Println(err) + // } + // if err = sstore.Close(); err != nil { + // log.Println(err) + // } + // estore = intimate.NewStoreExtractor() + // sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) - runtime.GC() - time.Sleep(time.Nanosecond) + // oe.supporters.Clear() + // oe.user.Clear() + // oe.userLive.Clear() + + // runtime.GC() // 主动gc + // log.Println("1000次执行, gc 重新建立sql链接") + // } source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0) if err != nil { @@ -52,7 +67,7 @@ func (oe *OpenrecExtractor) Execute() { lasterr = err } time.Sleep(time.Second * 2) - continue + return true } sdata := source.Ext.([]byte) @@ -65,13 +80,16 @@ func (oe *OpenrecExtractor) Execute() { streamer.UserId = userId streamer.Platform = string(intimate.Popenrec) - oe.user = intimate.NewExtractorSource(datamap["html_user"]) + htmlUser := datamap["html_user"] + oe.user = intimate.NewExtractorSource(&htmlUser) oe.user.CreateExtractor() - oe.userLive = intimate.NewExtractorSource(datamap["html_live"]) + htmlLive := datamap["html_live"] + oe.userLive = intimate.NewExtractorSource(&htmlLive) oe.userLive.CreateExtractor() - oe.supporters = intimate.NewExtractorSource(datamap["json_supporters"]) + jsonSupporters := datamap["json_supporters"] + oe.supporters = intimate.NewExtractorSource(&jsonSupporters) clog := &intimate.CollectLog{} // log.Println(anchorId) @@ -101,6 +119,13 @@ func (oe *OpenrecExtractor) Execute() { source.Operator = int32(intimate.OperatorExtractorOK) sstore.UpdateOperator(source) + return true + } + + for atomic.LoadInt32(&loop) > 0 { + if !execute() { + break + } } } @@ -143,6 +168,7 @@ func (oe *OpenrecExtractor) extractViewsAndLiveStreaming(clog intimate.ISet) { if err != nil { log.Println(err) } + if xp.NodeIter().Next() { views := regexp.MustCompile(`[0-9,]+`).FindString(xp.String()) views = strings.ReplaceAll(views, ",", "") diff --git a/extractor_field.go b/extractor_field.go index 76ae0b7..f959eaa 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -76,21 +76,31 @@ func (cl *CollectLog) Set(field string, value interface{}) { } type ExtractorSource struct { - source gjson.Result + source *gjson.Result extractor *hunter.Extractor } -func NewExtractorSource(gr gjson.Result) *ExtractorSource { +func NewExtractorSource(gr *gjson.Result) *ExtractorSource { es := &ExtractorSource{} - es.source = gr + es.SetSource(gr) return es } -func (es *ExtractorSource) CreateExtractor() { - es.extractor = hunter.NewExtractor([]byte(es.source.Str)) +func (es *ExtractorSource) SetSource(gr *gjson.Result) { + es.source = gr + es.extractor = nil } -func (es *ExtractorSource) GetSource() gjson.Result { +func (es *ExtractorSource) Clear() { + es.source = nil + es.extractor = nil +} + +func (es *ExtractorSource) CreateExtractor() { + es.extractor = hunter.NewExtractor([]byte(es.source.String())) +} + +func (es *ExtractorSource) GetSource() *gjson.Result { return es.source } diff --git a/go.mod b/go.mod index d39d59c..729309d 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,10 @@ go 1.14 require ( github.com/474420502/gcurl v0.1.2 - github.com/474420502/hunter v0.3.0 + github.com/474420502/hunter v0.3.4 github.com/go-sql-driver/mysql v1.5.0 github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb + github.com/tebeka/selenium v0.9.9 github.com/tidwall/gjson v1.6.0 github.com/tidwall/pretty v1.0.1 // indirect golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect diff --git a/go.sum b/go.sum index f8db769..a556617 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/474420502/focus v0.12.0 h1:+icbmj7IEOefvTegHt5EpcHt6WFbe2miIrceUJx2Ev github.com/474420502/focus v0.12.0/go.mod h1:d0PMjtMxFz1a9HIhwyFPkWa+JF+0LgOrEUfd8iZka6s= github.com/474420502/gcurl v0.1.2 h1:ON9Yz3IgAdtDlFlHfkAJ3aIEBDxH0RiViPE5ST5ohKg= github.com/474420502/gcurl v0.1.2/go.mod h1:hws5q/Ao64bXLLDnldz9VyTQUndTWc/i5DzdEazFfoM= -github.com/474420502/hunter v0.3.0 h1:0VPi1MInxjHOta3da4v0ALWK0y3/X4/6nUSLFvdbiFU= -github.com/474420502/hunter v0.3.0/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= +github.com/474420502/hunter v0.3.4 h1:fyLAgI84jWe3IcqsISC53j1w3CXI1FERxX//Potns0M= +github.com/474420502/hunter v0.3.4/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= github.com/474420502/requests v1.6.0 h1:f4h4j40eT0P5whhg9LdkotD8CaKjtuDu/vz9iSUkCgY= github.com/474420502/requests v1.6.0/go.mod h1:SLXrQ5dL9c7dkIeKNUCBAjOIt3J9KFCS2RQjWJecNwo= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= diff --git a/store.go b/store.go index 72a56dc..609534e 100644 --- a/store.go +++ b/store.go @@ -37,12 +37,22 @@ type IGetSet interface { // SourceStore 储存 type StoreSource struct { - table string - db *sql.DB + table string + db *sql.DB + + popCount int errorCount int errorLimit int } +func (store *StoreSource) PopCount() int { + return store.popCount +} + +func (store *StoreSource) Close() error { + return store.db.Close() +} + // NewSourceStore 创建一个存储实例 func NewStoreSource(table string) *StoreSource { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) @@ -142,6 +152,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e log.Println(err) } } + store.popCount++ }() s := &Source{} @@ -150,6 +161,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e if err != nil { return nil, err } + s.Set("LastOperator", s.Operator) _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil @@ -164,10 +176,19 @@ const CollectLogTable string = "collect_log" type StoreExtractor struct { db *sql.DB + popCount int errorCount int errorLimit int } +func (store *StoreExtractor) PopCount() int { + return store.popCount +} + +func (store *StoreExtractor) Close() error { + return store.db.Close() +} + func (store *StoreExtractor) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) @@ -220,6 +241,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer log.Println(err) } } + store.popCount++ }() // log.Println(selectSQL + ` limit 1 for update`) diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -0,0 +1 @@ +package main diff --git a/tasks/twitch/twitch_task1/task_twitch_test.go b/tasks/twitch/twitch_task1/task_twitch_test.go new file mode 100644 index 0000000..7c59497 --- /dev/null +++ b/tasks/twitch/twitch_task1/task_twitch_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "testing" + + "github.com/tebeka/selenium" + "github.com/tebeka/selenium/chrome" +) + +func TestCase1(t *testing.T) { + caps := selenium.Capabilities{"browserName": "chrome"} + chromecaps := chrome.Capabilities{} + err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx") + if err != nil { + panic(err) + } + caps.AddChrome(chromecaps) + _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", 3030) + if err != nil { + panic(err) + } + wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", 3030)) + if err != nil { + panic(err) + } + err = wd.Get("https://www.twitch.tv/directory/all") + if err != nil { + panic(err) + } +}