diff --git a/.gitignore b/.gitignore index 331f9bc..657f5f1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,7 @@ screenlog.* intimate *.gz debug.test - +myblock +run.sh +stop.sh diff --git a/config.go b/config.go index 1e73967..369ca21 100644 --- a/config.go +++ b/config.go @@ -17,7 +17,7 @@ func init() { InitConfig.Load() // storeOpenrec = NewStore() - log.SetFlags(log.Llongfile | log.Ldate) + log.SetFlags(log.Llongfile | log.Ltime) } // Config 配置 diff --git a/crx/myblock.crx b/crx/myblock.crx new file mode 100644 index 0000000..6ee88c6 Binary files /dev/null and b/crx/myblock.crx differ diff --git a/crx/myblock.pem b/crx/myblock.pem new file mode 100644 index 0000000..605b6cf --- /dev/null +++ b/crx/myblock.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDSG09DSvB03TOe +eOmQwfiCIf0wa2WRB31ewxa6i/PRgEKeJSUvIsIuaECUer2ss+J3rwSS2lDpGuiw +FnsVyZqKI/+Rcuc83YJGYg6OAzVMz6UL8YCWhXu3huTJ+V+a5iNereIC69ZERRJt +nXlWqsq6HKya+6BP9sX9CI4GTHQrnWBysAxsswhdnnnRvu+GxglWafSIzuS6OizT +1M1CmkZxNvDJhTSOR7SJlIYm2kM5/fIL53BdndF2IGAjfV1WV7AjwhTfun5cViEO +i8niQUIMY4L0AiO9grFD1g1xIYkeuVBoLxOUBzPxJwQmb64gseb9Dvt0BKLRGoou +SIOyE+KVAgMBAAECggEAI4b6J2kR0VUBEDwmVHO0K38HUstqNHSVgrNO0dLt8sAz +I44o5DhGqPW4a9L4ZS5SrkWyKonPcic6buISRIwfPVoacjQBfVWAXJnil6lbtyYK +ZMNcqLcgBRfCcpOgEq91DiKta6yIwekDFXVyCdFd78v+9ML1J+hUsLVkXJTLdP88 +PGamRWVd6vGy3QMRjyM29GLPgS+/6Vrp1cptSuYNqYhlszohmu8lBvzjH9jbPh9d +GFrrd8Bs7IRCdtKZig/3fbln4JEyyOYE+gcT2jplPksB6mR/5DBIdkVbeuFwGB0+ +h1/PKlprNQt7+Ei0HhHnTib7lZP8WGo4HkSi7PsAGQKBgQD1Ptho0wJiI2+6gL1O +iNsEJVKIQ2Sxdx3wI/qudphM99t6xKCpPyVI2Nd9PBf2jbZjGAaz+P/KQYxEqb6i +PRcQ+i99wCQoRfnRvUbKA4goEpKwRXmvn+499dm6D5pEuumOXGQYCmaFXuLTRN/I +BL6GNgLtoZAlLjUXaWtk8TszGQKBgQDbUf3p3HLpCjRvRDW/vA5xj+08t7xtF9uO +NilGK79uOA4VnxE2w3ioYqQ7t3I8J/0rAzGKq3tylg4QX6UpQ4b2koRr2B3cqoAk +dsRdNWAHwCNepz8hTLsZyuihzbNv2nHmoqhzjK/FcrBHx5NAM+T6OBpLzQBnbUzk +3wIcqm223QKBgQDo/IRxyY0pGMtLXoT6ODACF0b6JzRhGG37tuKvngGAlbQQRP7w +6wmL1F2cH1wQon7UU34CupqfVnhgvvZZgToJqfU2PTTcgeYc6Pl4b7SJhWOQTOCX +BZQ7jvYCulHv27aIxaNd53uQVx2cYoFKr58lN+i+QtADUoujq0YYxshb+QKBgQDW +ZOti7kZCeuBRGIu2V56C8uBFp5MBzf2polZsqx1iIFfcWPfZ4fGUIYFMgwKfvbOl +lWSbmxB9LiSnaugoU0OezBG43rYqXV4Qxy0jtKagTPoGcFWtNrX7+7e3XD8Zi6Am +hkFHW3MEAB5EvNq8Oz6OP8Os78SCVn2BimMlJJFF3QKBgQCF+aEAiBv+ivcmHUeP +2eBq9nLltPFAfXJ/p31MMQ6Jgo36DBqUeoLeyq/WfIXvwqbVbP9fANZrKoTPbI97 +dilCHUoO33rafXJy6jtaggtpz14tt9soecTop0vM/rU7tGtfBe6NXg9LRl+oDJCU +37I3a9Is+2CLyAUXWCk9mLfFsQ== +-----END PRIVATE KEY----- diff --git a/extractor/openrec_extractor/main.go b/extractor/openrec_extractor/main.go index 736ccfb..4596421 100644 --- a/extractor/openrec_extractor/main.go +++ b/extractor/openrec_extractor/main.go @@ -1,7 +1,6 @@ package main import ( - "net/http" _ "net/http/pprof" ) @@ -16,11 +15,6 @@ import ( */ func main() { - - go func() { - http.ListenAndServe("0.0.0.0:8899", nil) - }() - oe := &OpenrecExtractor{} oe.Execute() } diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index b4032af..3b689ad 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "regexp" - "runtime" "strconv" "strings" "sync/atomic" @@ -18,6 +17,9 @@ import ( "github.com/tidwall/gjson" ) +var estore = intimate.NewStoreExtractor() +var sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) + // OpenrecExtractor 提取方法 type OpenrecExtractor struct { user *intimate.ExtractorSource @@ -36,22 +38,17 @@ func (oe *OpenrecExtractor) Execute() { atomic.StoreInt32(&loop, 0) }() - estore := intimate.NewStoreExtractor() - sstore := intimate.NewStoreSource(string(intimate.STOpenrec)) var lasterr error = nil - for atomic.LoadInt32(&loop) > 0 { + var err error - runtime.GC() - time.Sleep(time.Nanosecond) - - source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0) + source, err := sstore.Pop(intimate.TOpenrecUser, 0) if err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) continue } @@ -63,15 +60,18 @@ func (oe *OpenrecExtractor) Execute() { streamer := &intimate.Streamer{} streamer.UserId = userId - streamer.Platform = string(intimate.Popenrec) + streamer.Platform = intimate.Popenrec - oe.user = intimate.NewExtractorSource(datamap["html_user"]) + htmlUser := datamap["html_user"] + oe.user = intimate.NewExtractorSource(&htmlUser) oe.user.CreateExtractor() - oe.userLive = intimate.NewExtractorSource(datamap["html_live"]) + htmlLive := datamap["html_live"] + oe.userLive = intimate.NewExtractorSource(&htmlLive) oe.userLive.CreateExtractor() - oe.supporters = intimate.NewExtractorSource(datamap["json_supporters"]) + jsonSupporters := datamap["json_supporters"] + oe.supporters = intimate.NewExtractorSource(&jsonSupporters) clog := &intimate.CollectLog{} // log.Println(anchorId) @@ -85,12 +85,13 @@ func (oe *OpenrecExtractor) Execute() { streamer.Uid = source.StreamerId.Int64 streamer.UpdateTime = source.UpdateTime + streamer.Tags = clog.Tags clog.Platform = string(intimate.Popenrec) clog.UserId = userId clog.UpdateTime = source.UpdateTime - logUid := estore.InsertCollectLog(clog) + logUid := estore.InsertClog(clog) LiveUrl := "https://www.openrec.tv/live/" + userId streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} @@ -100,8 +101,8 @@ func (oe *OpenrecExtractor) Execute() { source.Operator = int32(intimate.OperatorExtractorOK) sstore.UpdateOperator(source) - } + } func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { @@ -143,6 +144,7 @@ func (oe *OpenrecExtractor) extractViewsAndLiveStreaming(clog intimate.ISet) { if err != nil { log.Println(err) } + if xp.NodeIter().Next() { views := regexp.MustCompile(`[0-9,]+`).FindString(xp.String()) views = strings.ReplaceAll(views, ",", "") diff --git a/extractor/twitch_extractor/.gitignore b/extractor/twitch_extractor/.gitignore new file mode 100644 index 0000000..a2523a9 --- /dev/null +++ b/extractor/twitch_extractor/.gitignore @@ -0,0 +1,4 @@ +*.html +log +screenlog.* +twitch_extractor \ No newline at end of file diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go new file mode 100644 index 0000000..bd2c6cf --- /dev/null +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -0,0 +1,247 @@ +package main + +import ( + "database/sql" + "encoding/json" + "intimate" + "log" + "regexp" + "time" + + "github.com/tebeka/selenium" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +func main() { + wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() + + counter := intimate.NewCounter() + counter.SetMaxLimit(200) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) + + var lasterr error = nil + // var err error + + for !ps.IsClose() { + streamer, err := estore.Pop(intimate.Ptwitch, 0) + if streamer == nil || err != nil { + if err != lasterr { + log.Println(err, lasterr) + lasterr = err + } + time.Sleep(time.Second * 2) + continue + } + + var updateUrl map[string]string + json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + liveUrl := updateUrl["live"] + log.Println(liveUrl) + + // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") + err = wd.Get(liveUrl + "/about") + if err != nil { + log.Println(err) + estore.UpdateError(streamer, err) + time.Sleep(time.Second * 5) + continue + } + + streamer.LiveUrl = sql.NullString{String: liveUrl, Valid: true} + clog := &intimate.CollectLog{} + clog.UserId = streamer.UserId + clog.Gratuity = sql.NullInt64{Int64: 0, Valid: false} + + time.Sleep(time.Millisecond * 500) + err = extractUserName(wd, streamer) + if err != nil { + continue + } + err = extractFollowers(wd, clog) + if err != nil { + continue + } + + err = extractViews(wd, clog) // views + tags + gratuity + if err != nil { + // 不直播时提取礼物 gratuity + wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`) + btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) + if (err == nil && channelchat != nil) || btn != nil { + if channelchat != nil { + channelchat.Click() + } + time.Sleep(time.Second) + extractGratuity(wd, clog) + return true, nil + } + return false, nil + + }, time.Second*4) + } + + streamer.Platform = intimate.Ptwitch + clog.Platform = string(streamer.Platform) + clog.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} + lastClogId := estore.InsertClog(clog) + + streamer.Operator = 10 + streamer.LatestLogUid = lastClogId + if clog.Tags != nil { + streamer.Tags = clog.Tags + } + + switch fl := clog.Followers.Int64; { + case fl > 100000: + streamer.UpdateInterval = 120 + case fl > 10000: + streamer.UpdateInterval = 240 + case fl > 1000: + streamer.UpdateInterval = 360 + case fl > 100: + streamer.UpdateInterval = 720 + case fl > 0: + streamer.UpdateInterval = 1440 + } + + streamer.UpdateTime = clog.UpdateTime + estore.UpdateStreamer(streamer) + counter.AddWithReset(1) + } + + wd.Close() + wd.Quit() +} + +func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") + if err == nil { + if ltxt, err := label.Text(); err == nil && ltxt != "" { + // log.Println("label:", ltxt) + streamer.UserName = sql.NullString{String: ltxt, Valid: true} + return true, nil + } + } + return false, err + }, 15*time.Second) +} + +func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + efollowers, err := web.FindElement(selenium.ByXPATH, "//div[@data-a-target='about-panel']//div[@class='tw-align-center']") + if err != nil { + return false, err + } + followers, err := efollowers.Text() + if err != nil || followers == "" { + return false, err + } + followers = regexp.MustCompile(`[\d,]+`).FindString(followers) + fint, _ := intimate.ParseNumber(followers) + clog.Followers = sql.NullInt64{Int64: int64(fint), Valid: true} + // log.Println("followers: ", followers, fint) + return true, nil + }, 4*time.Second) +} + +func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + views, err := web.FindElement(selenium.ByXPATH, "//a[@data-a-target='home-live-overlay-button']/span") + if views != nil { + if txt, err := views.Text(); err == nil { + + vint, _ := intimate.ParseNumber(txt) + clog.Views = sql.NullInt64{Int64: vint, Valid: true} + // log.Println("views:", txt) + views.Click() + + extractTags(wd, clog) + extractTitle(wd, clog) + extractGratuity(wd, clog) + + return true, nil + } + } + return false, err + }, time.Second*4) +} + +func extractTitle(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + title, err := web.FindElement(selenium.ByXPATH, `//h2[@data-a-target='stream-title']`) + if err == nil { + if txt, err := title.Text(); err == nil { + clog.LiveTitle = sql.NullString{String: txt, Valid: true} + return true, nil + } + } + return false, err + }, time.Second*4) +} + +func extractTags(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + tags, err := web.FindElements(selenium.ByXPATH, "//a[@aria-label and @data-a-target and @href]/div[@class and text()]") + if len(tags) == 0 { + return false, err + } + + var stags []string + for _, tag := range tags { + if txt, err := tag.Text(); err == nil { + stags = append(stags, txt) + } else { + log.Println(err) + } + } + if len(stags) > 0 { + if tagbuf, err := json.Marshal(stags); err == nil { + clog.Tags = tagbuf + } else { + log.Println(err) + } + } + + return true, nil + }, time.Second*4) +} + +func extractGratuity(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + btn, err := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) + if err == nil { + btn.Click() + time.Sleep(time.Second) + gifcount, err := web.FindElements(selenium.ByXPATH, `//div[@class="sub-gift-count tw-flex"]/p`) + if err == nil { + var gratuity int64 = 0 + for _, gc := range gifcount { + if gtxt, err := gc.Text(); err == nil { + gint, _ := intimate.ParseNumber(gtxt) + gratuity += gint + } else { + log.Println(err) + } + } + clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true} + } + return true, nil + } + + return false, err + }, time.Second*4) +} diff --git a/extractor/twitch_extractor/twitch_test.go b/extractor/twitch_extractor/twitch_test.go new file mode 100644 index 0000000..731b2d3 --- /dev/null +++ b/extractor/twitch_extractor/twitch_test.go @@ -0,0 +1,9 @@ +package main + +import ( + "testing" +) + +func TestCase0(t *testing.T) { + main() +} diff --git a/extractor_field.go b/extractor_field.go index 76ae0b7..ad5ef30 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -12,18 +12,19 @@ type GetSet struct { } type Streamer struct { - Uid int64 // - Platform string // - UserId string // + Uid int64 // + Platform Platform // + UserId string // UserName sql.NullString // LiveUrl sql.NullString // Channel sql.NullString // - Ext interface{} // + Tags interface{} + Ext interface{} // IsUpdateStreamer bool // 更新上面的内容 IsUpdateUrl bool - updateInterval int32 + UpdateInterval int32 UpdateUrl interface{} LatestLogUid int64 UpdateTime sql.NullTime // @@ -76,21 +77,31 @@ func (cl *CollectLog) Set(field string, value interface{}) { } type ExtractorSource struct { - source gjson.Result + source *gjson.Result extractor *hunter.Extractor } -func NewExtractorSource(gr gjson.Result) *ExtractorSource { +func NewExtractorSource(gr *gjson.Result) *ExtractorSource { es := &ExtractorSource{} - es.source = gr + es.SetSource(gr) return es } -func (es *ExtractorSource) CreateExtractor() { - es.extractor = hunter.NewExtractor([]byte(es.source.Str)) +func (es *ExtractorSource) SetSource(gr *gjson.Result) { + es.source = gr + es.extractor = nil } -func (es *ExtractorSource) GetSource() gjson.Result { +func (es *ExtractorSource) Clear() { + es.source = nil + es.extractor = nil +} + +func (es *ExtractorSource) CreateExtractor() { + es.extractor = hunter.NewExtractor([]byte(es.source.String())) +} + +func (es *ExtractorSource) GetSource() *gjson.Result { return es.source } diff --git a/go.mod b/go.mod index d39d59c..d8b824d 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,13 @@ module intimate go 1.14 require ( + github.com/474420502/focus v0.12.0 github.com/474420502/gcurl v0.1.2 - github.com/474420502/hunter v0.3.0 + github.com/474420502/hunter v0.3.4 + github.com/474420502/requests v1.6.0 github.com/go-sql-driver/mysql v1.5.0 github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb + github.com/tebeka/selenium v0.9.9 github.com/tidwall/gjson v1.6.0 github.com/tidwall/pretty v1.0.1 // indirect golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect diff --git a/go.sum b/go.sum index f8db769..a556617 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/474420502/focus v0.12.0 h1:+icbmj7IEOefvTegHt5EpcHt6WFbe2miIrceUJx2Ev github.com/474420502/focus v0.12.0/go.mod h1:d0PMjtMxFz1a9HIhwyFPkWa+JF+0LgOrEUfd8iZka6s= github.com/474420502/gcurl v0.1.2 h1:ON9Yz3IgAdtDlFlHfkAJ3aIEBDxH0RiViPE5ST5ohKg= github.com/474420502/gcurl v0.1.2/go.mod h1:hws5q/Ao64bXLLDnldz9VyTQUndTWc/i5DzdEazFfoM= -github.com/474420502/hunter v0.3.0 h1:0VPi1MInxjHOta3da4v0ALWK0y3/X4/6nUSLFvdbiFU= -github.com/474420502/hunter v0.3.0/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= +github.com/474420502/hunter v0.3.4 h1:fyLAgI84jWe3IcqsISC53j1w3CXI1FERxX//Potns0M= +github.com/474420502/hunter v0.3.4/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= github.com/474420502/requests v1.6.0 h1:f4h4j40eT0P5whhg9LdkotD8CaKjtuDu/vz9iSUkCgY= github.com/474420502/requests v1.6.0/go.mod h1:SLXrQ5dL9c7dkIeKNUCBAjOIt3J9KFCS2RQjWJecNwo= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= diff --git a/platform_list.go b/platform_list.go index d694917..0bdefc0 100644 --- a/platform_list.go +++ b/platform_list.go @@ -4,6 +4,9 @@ package intimate type Platform string const ( - // Popenrec openrec源table名称 + // Popenrec openrec 平台 Popenrec Platform = "openrec" + + // Ptwitch twitch 平台 + Ptwitch Platform = "twitch" ) diff --git a/source_field.go b/source_field.go index 1236c8c..50a682f 100644 --- a/source_field.go +++ b/source_field.go @@ -19,8 +19,8 @@ type Source struct { UpdateTime sql.NullTime // ErrorMsg sql.NullString // - TargetType string // - Operator int32 // + Target Target // + Operator int32 // LastOperator int32 } diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index f588757..91d3e3b 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS `streamer` ( `user_name` varchar(255) DEFAULT NULL COMMENT '用户名字 区别于ID', `live_url` text COMMENT '直播的url', `channel` varchar(128) DEFAULT NULL COMMENT'所属 频道,分类 未必所有平台都有明确的标签', + `tag` json DEFAULT NULL COMMENT 'streamer 最新的tag', `ext` json DEFAULT NULL COMMENT '扩展类型, 把一些可能需要但是没字段的数据放在json扩展', `is_update_streamer` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新streamer的信息. 1为需要,0则否', diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index cf49f79..4fce51b 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -12,6 +12,26 @@ CREATE TABLE IF NOT EXISTS `source_openrec` ( `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', `error_msg` text DEFAULT NULL COMMENT '错误信息', + `target_type` varchar(64) NOT NULL COMMENT '目标类型', + `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', + PRIMARY KEY(`uid`), + KEY `operator_idx` (`operator`), + KEY `update_time_idx` (`update_time`), + KEY `target_type_idx` (`target_type`) + ); + + + CREATE TABLE IF NOT EXISTS `source_twitch` ( + uid bigint AUTO_INCREMENT COMMENT '自增UID', + + `streamer_id` bigint DEFAULT NULL COMMENT 'streamer uid, 关联主播', + `url` text NOT NULL COMMENT '获取源数据地址', + `source` longtext DEFAULT NULL COMMENT '源数据', + `ext` json DEFAULT NULL COMMENT '扩展字段', + `serialize` blob DEFAULT NULL COMMENT '需要给下个任务传递 序列花数据, 非必要不用', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', + `error_msg` text DEFAULT NULL COMMENT '错误信息', + `target_type` varchar(64) NOT NULL COMMENT '目标类型', `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', PRIMARY KEY(`uid`), diff --git a/sql/remake_database.sh b/sql/remake_database.sh new file mode 100644 index 0000000..180663b --- /dev/null +++ b/sql/remake_database.sh @@ -0,0 +1,10 @@ +# /bin/bash +USER=root +HOST=127.0.0.1 +PORT=4000 + +# mysql -h $HOST -u $USER -P $PORT -c "drop database intimate_source"; +# mysql -h $HOST -u $USER -P $PORT -c "drop database intimate_extractor"; + +mysql -h $HOST -u $USER -P $PORT < ./intimate_extractor.sql; +mysql -h $HOST -u $USER -P $PORT < ./intimate_source.sql; diff --git a/store.go b/store.go index 72a56dc..cbdbbfc 100644 --- a/store.go +++ b/store.go @@ -37,12 +37,22 @@ type IGetSet interface { // SourceStore 储存 type StoreSource struct { - table string - db *sql.DB + table string + db *sql.DB + + popCount int errorCount int errorLimit int } +func (store *StoreSource) PopCount() int { + return store.popCount +} + +func (store *StoreSource) Close() error { + return store.db.Close() +} + // NewSourceStore 创建一个存储实例 func NewStoreSource(table string) *StoreSource { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) @@ -69,7 +79,16 @@ func (store *StoreSource) errorAlarm(err error) { // Insert 插入数据 func (store *StoreSource) Insert(isource IGet) { - _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) + _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("Target"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) + if err != nil { + panic(err) + } +} + +// Deduplicate 去重 +func (store *StoreSource) Deduplicate(target Target, field string) { + sql := `DELETE FROM ` + store.table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store.table + ` force index(target_type_idx) WHERE target_type = "` + string(target) + `" ) s GROUP BY s.` + string(field) + `) ;` + _, err := store.db.Exec(sql) if err != nil { panic(err) } @@ -112,13 +131,13 @@ func (store *StoreSource) Restore(isource IGet) { } // Pop 弹出一条未处理的数据 -func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, error) { +func (store *StoreSource) Pop(targetType Target, operators ...int32) (*Source, error) { tx, err := store.db.Begin() if err != nil { return nil, err } - var args = []interface{}{targetType} + var args = []interface{}{string(targetType)} selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" @@ -142,14 +161,16 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e log.Println(err) } } + store.popCount++ }() s := &Source{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) + err = row.Scan(&s.Uid, &s.Url, &s.Target, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) if err != nil { return nil, err } + s.Set("LastOperator", s.Operator) _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil @@ -164,10 +185,19 @@ const CollectLogTable string = "collect_log" type StoreExtractor struct { db *sql.DB + popCount int errorCount int errorLimit int } +func (store *StoreExtractor) PopCount() int { + return store.popCount +} + +func (store *StoreExtractor) Close() error { + return store.db.Close() +} + func (store *StoreExtractor) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) @@ -193,14 +223,14 @@ func NewStoreExtractor() *StoreExtractor { } // Pop 弹出一条未处理的数据 -func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer, error) { +func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Streamer, error) { tx, err := store.db.Begin() if err != nil { return nil, err } - var args = []interface{}{platform} - selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` + var args = []interface{}{string(platform)} + selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) @@ -220,6 +250,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer log.Println(err) } } + store.popCount++ }() // log.Println(selectSQL + ` limit 1 for update`) @@ -227,7 +258,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer s := &Streamer{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer) + err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer, &s.UpdateInterval) if err != nil { return nil, err } @@ -239,7 +270,7 @@ func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer // InsertStreamer Streamer表, 插入数据 func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { // select uid from table where platform = ? and user_id = ? - selectSQL := "SELECT is_update_url FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" + selectSQL := "SELECT is_update_url, uid FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" tx, err := store.db.Begin() if err != nil { panic(err) @@ -258,14 +289,16 @@ func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { row := tx.QueryRow(selectSQL+` LIMIT 1 FOR UPDATE`, streamer.Get("Platform"), streamer.Get("UserId")) var isUpdateUrl bool - if err = row.Scan(&isUpdateUrl); err == nil { + var Uid int64 + if err = row.Scan(&isUpdateUrl, &Uid); err == nil { if isUpdateUrl { tx.Exec("UPDATE "+StreamerTable+" SET update_url = ?", streamer.Get("UpdateUrl")) } + streamer.(ISet).Set("Uid", Uid) return true } - _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*30)) + _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*60)) if err != nil { panic(err) } @@ -300,16 +333,33 @@ func (store *StoreExtractor) UpdateOperator(isource IGet) { } // UpdateStreamer Streamer表, 插入数据 -func (store *StoreExtractor) UpdateStreamer(isource IGet) { - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?, operator = ?, update_time = ? WHERE uid = ?;", - isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("UpdateTime"), isource.Get("Uid")) +func (store *StoreExtractor) UpdateStreamer(streamer IGet) { + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?;", + streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) if err != nil { panic(err) } } -// InsertCollectLog CollectLog表插入数据 -func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { +// Update Streamer表, 更新指定的字段 +func (store *StoreExtractor) Update(streamer IGet, fieldvalues ...interface{}) { + updateSQL := "UPDATE " + StreamerTable + " SET " + var values []interface{} + for i := 0; i < len(fieldvalues); i += 2 { + field := fieldvalues[i] + values = append(values, fieldvalues[i+1]) + updateSQL += field.(string) + " = ? " + } + updateSQL += "WHERE uid = ?" + values = append(values, streamer.Get("Uid")) + _, err := store.db.Exec(updateSQL, values...) + if err != nil { + panic(err) + } +} + +// InsertClog CollectLog表插入数据 +func (store *StoreExtractor) InsertClog(clog IGet) int64 { tx, err := store.db.Begin() defer func() { @@ -324,7 +374,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { } result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), + clog.Get("StreamerUid"), clog.Get("Platform"), clog.Get("UserId"), clog.Get("IsLiveStreaming"), clog.Get("IsError"), clog.Get("Followers"), clog.Get("Views"), clog.Get("Giver"), clog.Get("Gratuity"), clog.Get("LiveTitle"), clog.Get("LiveStartTime"), clog.Get("LiveEndTime"), clog.Get("UpdateTime"), clog.Get("Tags"), clog.Get("Ext"), clog.Get("ErrorMsg"), ) if err != nil { panic(err) @@ -335,7 +385,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { panic(err) } - _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) + _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, clog.Get("StreamerUid")) if err = tx.Commit(); err != nil { panic(err) } diff --git a/table_list.go b/table_list.go index 2a9fa5b..04002dc 100644 --- a/table_list.go +++ b/table_list.go @@ -6,5 +6,7 @@ type SourceTable string const ( // STOpenrec openrec源table名称 STOpenrec SourceTable = "source_openrec" -) + // STTwitch twitch源table名称 + STTwitch SourceTable = "source_twitch" +) diff --git a/target_type_list.go b/target_type_list.go index 8869ec3..941e05d 100644 --- a/target_type_list.go +++ b/target_type_list.go @@ -1,12 +1,18 @@ package intimate -// TargetType 源的 目标类型 列表 -type TargetType string +// Target 源的 目标类型 列表 +type Target string const ( - // TTOpenrecRanking openrec源TargetType名称 - TTOpenrecRanking TargetType = "openrec_ranking" + // TOpenrecRanking 获取排名 Target名称 + TOpenrecRanking Target = "openrec_ranking" - // TTOpenrecUser openrec源TargetType名称 - TTOpenrecUser TargetType = "openrec_user" + // TOpenrecUser 获取用户列表 源Target名称 + TOpenrecUser Target = "openrec_user" + + // TTwitchChannel twitch 获取类别操作目标 + TTwitchChannel Target = "twitch_channel" + + // TTwitchUser twitch 获取类别操作目标 + TTwitchUser Target = "twitch_user" ) diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index acff785..124be92 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -70,12 +70,12 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { tp := cxt.Temporary() content := resp.Content() - if len(content) <= 200 { // 末页退出 + if len(content) <= 200 { //末页时没有内容返回, 末页退出 finishpoint := time.Now() - log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) - for time.Now().Sub(finishpoint) < time.Minute*60 { + log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120)) + for time.Now().Sub(finishpoint) < time.Minute*120 { time.Sleep(time.Second) - if atomic.LoadInt32(&loop) > 0 { + if atomic.LoadInt32(&loop) <= 0 { return } } @@ -102,7 +102,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { streamer := &intimate.Streamer{} streamer.UserId = userid - streamer.Platform = string(intimate.Popenrec) + streamer.Platform = intimate.Popenrec updateUrl := make(map[string]interface{}) @@ -122,6 +122,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { } } + // 修改url query 参数的page递增. 遍历所有页面 querys := tp.GetQuery() page, err := strconv.Atoi(querys.Get("page")) if err != nil { diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 74526e0..9126949 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -51,7 +51,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for atomic.LoadInt32(&loop) > 0 { - streamer, err := estore.Pop(string(intimate.Popenrec)) + streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if streamer == nil || err != nil { if err != lasterr { @@ -66,7 +66,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { var updateUrl map[string]string - err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue @@ -74,7 +74,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // Check Userid userUrl := updateUrl["user"] - tp := cxt.Session().Get(userUrl) + tp := cxt.Session().Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} @@ -86,14 +86,14 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { cookies := cxt.Session().GetCookies(tp.GetParsedURL()) - scurl := updateUrl["supporters"] + scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string - for { + for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() @@ -122,13 +122,13 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) - supporterdata := supporterjson.Get("data") + supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) - page := supportersQuery.Get("page_number") + page := supportersQuery.Get("page_number") // page_number 加1 pageint, err := strconv.Atoi(page) if err != nil { log.Println(err) @@ -167,7 +167,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { streamer.Operator = int32(intimate.OperatorOK) source := &intimate.Source{} - source.TargetType = string(intimate.TTOpenrecUser) + source.Target = intimate.TOpenrecUser source.Ext = string(extJsonBytes) source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) diff --git a/tasks/twitch/twitch_task1/.gitignore b/tasks/twitch/twitch_task1/.gitignore new file mode 100644 index 0000000..3684d9b --- /dev/null +++ b/tasks/twitch/twitch_task1/.gitignore @@ -0,0 +1,2 @@ +twitch_task1 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task1/main.go b/tasks/twitch/twitch_task1/main.go new file mode 100644 index 0000000..b0019dd --- /dev/null +++ b/tasks/twitch/twitch_task1/main.go @@ -0,0 +1,6 @@ +package main + +func main() { + e := ChannelLink{} + e.Execute() +} diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go new file mode 100644 index 0000000..2b385ed --- /dev/null +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -0,0 +1,113 @@ +package main + +import ( + "database/sql" + "intimate" + "log" + "time" + + "github.com/tebeka/selenium" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +// 获取类型的所有频道链接 + +// ChannelLink 频道链接 +type ChannelLink struct { +} + +// Execute 执行任务 +func (cl *ChannelLink) Execute() { + var err error + wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() + + weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" + err = wd.Get(weburl) + if err != nil { + panic(err) + } + + cardCondition := func(wd selenium.WebDriver) (bool, error) { + elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + return false, err + } + return len(elements) > 0, nil + } + wd.WaitWithTimeout(cardCondition, time.Second*15) + time.Sleep(time.Second) + + e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") + if err != nil { + panic(err) + } + e.Click() + + var hrefs map[string]bool = make(map[string]bool) + var delayerror = 5 + var samecount = 0 + for i := 0; i <= 200; i++ { + cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + log.Println(err) + break + } + + if len(cards) == samecount { + delayerror-- + if delayerror <= 0 { + break + } + } else { + delayerror = 5 + } + + for ii := 0; ii < 10; ii++ { + for _, card := range cards { + href, err := card.GetAttribute("href") + if err != nil { + log.Println(href, err) + continue + } else { + hrefs[href] = true + } + } + break + } + samecount = len(cards) + if ps.IsClose() { + break + } + + if len(cards) > 10 { + log.Println(len(cards)) + wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null); + for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil) + } + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2500) + } + + for href := range hrefs { + + // TODO: Save href + source := &intimate.Source{} + source.Source = sql.NullString{String: href, Valid: true} + source.Operator = 0 + source.Target = intimate.TTwitchChannel + source.Url = weburl + sstore.Insert(source) + } + + log.Println("hrefs len:", len(hrefs)) + sstore.Deduplicate(intimate.TTwitchChannel, "source") +} diff --git a/tasks/twitch/twitch_task1/task_twitch_test.go b/tasks/twitch/twitch_task1/task_twitch_test.go new file mode 100644 index 0000000..e61dc1e --- /dev/null +++ b/tasks/twitch/twitch_task1/task_twitch_test.go @@ -0,0 +1,14 @@ +package main + +import ( + "testing" +) + +func TestCase1(t *testing.T) { + e := ChannelLink{} + e.Execute() +} + +func TestLiveUrl(t *testing.T) { + +} diff --git a/tasks/twitch/twitch_task2/.gitignore b/tasks/twitch/twitch_task2/.gitignore new file mode 100644 index 0000000..846a6b4 --- /dev/null +++ b/tasks/twitch/twitch_task2/.gitignore @@ -0,0 +1,2 @@ +twitch_task2 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task2/main.go b/tasks/twitch/twitch_task2/main.go new file mode 100644 index 0000000..d81b18b --- /dev/null +++ b/tasks/twitch/twitch_task2/main.go @@ -0,0 +1,6 @@ +package main + +func main() { + ul := UserList{} + ul.Execute() +} diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go new file mode 100644 index 0000000..52773de --- /dev/null +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -0,0 +1,177 @@ +package main + +import ( + "database/sql" + "encoding/json" + "intimate" + "log" + "regexp" + "time" + + "github.com/tebeka/selenium" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +// 获取类型的所有频道链接 + +// UserList 频道链接 +type UserList struct { +} + +// Execute 执行任务 +func (cl *UserList) Execute() { + // DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ; + //article//a[@data-a-target='preview-card-title-link'] + + wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() + counter := intimate.NewCounter() + counter.SetMaxLimit(100) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) + + for !ps.IsClose() { + + var err error + sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) + if err != nil { + panic(err) + } + + weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT" + err = wd.Get(weburl) + if err != nil { + log.Println(err) + sstore.UpdateError(sourceChannel, err) + time.Sleep(time.Second * 10) + continue + } + + wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { + _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + return false, err + } + return true, nil + }, time.Second*10) + + btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") + if err != nil { + log.Println(err) + continue + } + btn.Click() + + var elements []selenium.WebElement + var liveurls = 0 + var delayerror = 2 + for i := 0; i < 200 && !ps.IsClose(); i++ { + elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + log.Println(err) + break + } + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2000) + if len(elements) == liveurls { + delayerror-- + if delayerror <= 0 { + break + } + } else { + delayerror = 2 + } + liveurls = len(elements) + } + articles, err := wd.FindElements(selenium.ByXPATH, "//article") + if err != nil { + log.Println(err) + continue + } + + for _, article := range articles { + + e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") + if err != nil { + log.Println(err) + continue + } + + href, err := e.GetAttribute("href") + if err != nil { + log.Println(err) + continue + } + + btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") + if err != nil { + log.Println(err) + continue + } + + var tags []string + for _, btn := range btns { + tag, err := btn.GetAttribute("data-a-target") + if err == nil { + tags = append(tags, tag) + } + } + + streamer := &intimate.Streamer{} + + matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) + if len(matches) == 2 { + streamer.UserId = matches[1] + } else { + log.Println(href) + continue + } + + jtags, err := json.Marshal(tags) + if err != nil { + log.Println(err) + } else { + streamer.Tags = jtags + } + + streamer.Platform = intimate.Ptwitch + + updateUrl := make(map[string]string) + updateUrl["live"] = href + streamer.LiveUrl = sql.NullString{String: href, Valid: true} + data, err := json.Marshal(updateUrl) + if err != nil { + log.Println(err) + continue + } + streamer.UpdateUrl = data + streamer.Operator = 0 + if estore.InsertStreamer(streamer) { + // log.Println("streamer update tags", streamer.Uid, tags) + estore.Update(streamer, "Tags", streamer.Tags) + } + } + log.Println("streamer find", len(articles)) + if len(articles) == 0 { + sourceChannel.Operator = 5 + sstore.UpdateOperator(sourceChannel) + } + counter.AddWithReset(1) + } + + wd.Close() + wd.Quit() +} diff --git a/tasks/twitch/twitch_task2/task_twitch_test.go b/tasks/twitch/twitch_task2/task_twitch_test.go new file mode 100644 index 0000000..ef64976 --- /dev/null +++ b/tasks/twitch/twitch_task2/task_twitch_test.go @@ -0,0 +1,7 @@ +package main + +import "testing" + +func TestMain(t *testing.T) { + main() +} diff --git a/utils.go b/utils.go index 9df8fbb..ebee59b 100644 --- a/utils.go +++ b/utils.go @@ -1,8 +1,19 @@ package intimate import ( + "fmt" "log" + "os" + "os/signal" + "runtime" + "strconv" + "strings" + "sync/atomic" + "syscall" "time" + + "github.com/tebeka/selenium" + "github.com/tebeka/selenium/chrome" ) var zeroTime time.Time @@ -17,6 +28,12 @@ func init() { } +// ParseNumber 去逗号解析数字 +func ParseNumber(number string) (int64, error) { + number = strings.ReplaceAll(number, ",", "") + return strconv.ParseInt(number, 10, 64) +} + // ParseDuration time to duration eg: 1:40:00 -> time.Duration func ParseDuration(dt string) (time.Duration, error) { @@ -37,8 +54,186 @@ func ParseDuration(dt string) (time.Duration, error) { tdt, err := time.Parse("15:04:05", string(parse)) if err != nil { - return time.Duration(0), err } return tdt.Sub(zeroTime), nil } + +func GetChromeDriver(port int) selenium.WebDriver { + var err error + caps := selenium.Capabilities{"browserName": "chrome"} + + chromecaps := chrome.Capabilities{} + for _, epath := range []string{"../../../crx/myblock.crx", "../../crx/myblock.crx"} { + _, err := os.Stat(epath) + if err == nil { + err := chromecaps.AddExtension(epath) + if err != nil { + panic(err) + } + break + } + } + + if proxy := os.Getenv("chrome_proxy"); proxy != "" { + log.Println("proxy-server", proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-server="+proxy) + } + + if proxy := os.Getenv("pac_proxy"); proxy != "" { + log.Println("--proxy-pac-url=" + proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url="+proxy) + } + + // chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url=http://127.0.0.1:1081/pac") + chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/tmp/chromedriver-cache") + chromecaps.Args = append(chromecaps.Args, "--disable-gpu", "--disable-images", "--start-maximized", "--disable-infobars") + // chromecaps.Args = append(chromecaps.Args, "--headless") + chromecaps.Args = append(chromecaps.Args, "--no-sandbox") + chromecaps.Args = append(chromecaps.Args, "--disable-dev-shm-usage", "--mute-audio", "--safebrowsing-disable-auto-update") + + chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") + caps.AddChrome(chromecaps) + _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", port) + if err != nil { + panic(err) + } + wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", port)) + if err != nil { + panic(err) + } + runtime.SetFinalizer(wd, func(obj interface{}) { + + if err := obj.(selenium.WebDriver).Close(); err != nil { + log.Println(err) + } + if err := obj.(selenium.WebDriver).Quit(); err != nil { + log.Println(err) + } + + }) + wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) + if err != nil { + panic(err) + } + + return wd +} + +// PerfectShutdown 完美关闭程序 +type PerfectShutdown struct { + loop int32 +} + +// NewPerfectShutdown 创建完美关闭程序 +func NewPerfectShutdown() *PerfectShutdown { + ps := &PerfectShutdown{} + ps.loop = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&ps.loop, 0) + }() + + return ps +} + +// IsClose 判断是否要关闭 +func (ps *PerfectShutdown) IsClose() bool { + return atomic.LoadInt32(&ps.loop) == 0 +} + +type Counter struct { + dcount int + count int + maxLimit int + minLimit int + + minobj []interface{} + maxobj []interface{} + maxLimitToDo func(obj ...interface{}) error + minLimitToDo func(obj ...interface{}) error +} + +func NewCounter() *Counter { + c := &Counter{} + return c +} + +// SetDefault 设置默认值 +func (c *Counter) SetDefault(n int) { + c.dcount = n +} + +// Reset 最置count为defaultCount值 +func (c *Counter) Reset() { + c.count = c.dcount +} + +// SetCount 设置count到最大值的时候执行do函数 +func (c *Counter) SetCount(count int) { + c.count = count +} + +// GetCount 设置count到最大值的时候执行do函数 +func (c *Counter) GetCount() int { + return c.count +} + +// SetMinLimit 设置最小限制 +func (c *Counter) SetMinLimit(n int) { + c.minLimit = n +} + +// SetMaxLimit 设置最大限制 +func (c *Counter) SetMaxLimit(n int) { + c.maxLimit = n +} + +// SetMaxToDo 设置count到最大值的时候执行do函数 +func (c *Counter) SetMaxToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.maxLimitToDo = do + c.maxobj = obj +} + +// SetMinToDo 设置count到最小值的时候执行do函数 +func (c *Counter) SetMinToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.minLimitToDo = do + c.minobj = obj +} + +// AddWithReset 操作 count 默认值为0, 当触发限制时, 重置为默认值 +func (c *Counter) AddWithReset(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + defer c.Reset() + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + defer c.Reset() + return c.minLimitToDo(c.minobj...) + } + } + return nil +} + +// Add 操作 count 默认值为0 +func (c *Counter) Add(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + return c.minLimitToDo(c.minobj...) + } + } + return nil +} diff --git a/xvfb.sh b/xvfb.sh new file mode 100644 index 0000000..d5f6036 --- /dev/null +++ b/xvfb.sh @@ -0,0 +1 @@ +screen -dmS xvfb-99 Xvfb :99 -screen 0 1280x720x24 -ac -nolisten tcp -dpi 96 +extension RANDR -nolisten tcp