diff --git a/.gitignore b/.gitignore index 1796cca..657f5f1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,5 +7,6 @@ intimate *.gz debug.test myblock - +run.sh +stop.sh diff --git a/config.go b/config.go index 1e73967..369ca21 100644 --- a/config.go +++ b/config.go @@ -17,7 +17,7 @@ func init() { InitConfig.Load() // storeOpenrec = NewStore() - log.SetFlags(log.Llongfile | log.Ldate) + log.SetFlags(log.Llongfile | log.Ltime) } // Config 配置 diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 45f751e..3b689ad 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -48,7 +48,7 @@ func (oe *OpenrecExtractor) Execute() { log.Println(err, lasterr) lasterr = err } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) continue } @@ -85,12 +85,13 @@ func (oe *OpenrecExtractor) Execute() { streamer.Uid = source.StreamerId.Int64 streamer.UpdateTime = source.UpdateTime + streamer.Tags = clog.Tags clog.Platform = string(intimate.Popenrec) clog.UserId = userId clog.UpdateTime = source.UpdateTime - logUid := estore.InsertCollectLog(clog) + logUid := estore.InsertClog(clog) LiveUrl := "https://www.openrec.tv/live/" + userId streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} diff --git a/tasks/twitch/twitch_task3/task_twitch_test.go b/extractor/twitch_extractor/tiwtch_extractor.go similarity index 80% rename from tasks/twitch/twitch_task3/task_twitch_test.go rename to extractor/twitch_extractor/tiwtch_extractor.go index db7b37b..bd2c6cf 100644 --- a/tasks/twitch/twitch_task3/task_twitch_test.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -5,12 +5,7 @@ import ( "encoding/json" "intimate" "log" - "os" - "os/signal" "regexp" - "sync/atomic" - "syscall" - "testing" "time" "github.com/tebeka/selenium" @@ -22,21 +17,24 @@ var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwi // estore 解析存储连接实例 var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() -func TestCase(t *testing.T) { - var loop int32 = 1 +func main() { wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() - go func() { - signalchan := make(chan os.Signal) - signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) - log.Println("accept stop command:", <-signalchan) - atomic.StoreInt32(&loop, 0) - }() + counter := intimate.NewCounter() + counter.SetMaxLimit(200) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) var lasterr error = nil // var err error - for atomic.LoadInt32(&loop) > 0 { + for !ps.IsClose() { streamer, err := estore.Pop(intimate.Ptwitch, 0) if streamer == nil || err != nil { if err != lasterr { @@ -56,19 +54,27 @@ func TestCase(t *testing.T) { err = wd.Get(liveUrl + "/about") if err != nil { log.Println(err) - //estore.UpdateError(streamer, err) + estore.UpdateError(streamer, err) + time.Sleep(time.Second * 5) continue } streamer.LiveUrl = sql.NullString{String: liveUrl, Valid: true} clog := &intimate.CollectLog{} clog.UserId = streamer.UserId + clog.Gratuity = sql.NullInt64{Int64: 0, Valid: false} time.Sleep(time.Millisecond * 500) - extractUserName(wd, streamer) - extractFollowers(wd, clog) - err = extractViews(wd, clog) // views + tags + gratuity + err = extractUserName(wd, streamer) + if err != nil { + continue + } + err = extractFollowers(wd, clog) + if err != nil { + continue + } + err = extractViews(wd, clog) // views + tags + gratuity if err != nil { // 不直播时提取礼物 gratuity wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { @@ -90,27 +96,48 @@ func TestCase(t *testing.T) { streamer.Platform = intimate.Ptwitch clog.Platform = string(streamer.Platform) clog.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - lastClogId := estore.InsertCollectLog(clog) + lastClogId := estore.InsertClog(clog) - streamer.Operator = 100 + streamer.Operator = 10 streamer.LatestLogUid = lastClogId - estore.UpdateStreamer(streamer) + if clog.Tags != nil { + streamer.Tags = clog.Tags + } + switch fl := clog.Followers.Int64; { + case fl > 100000: + streamer.UpdateInterval = 120 + case fl > 10000: + streamer.UpdateInterval = 240 + case fl > 1000: + streamer.UpdateInterval = 360 + case fl > 100: + streamer.UpdateInterval = 720 + case fl > 0: + streamer.UpdateInterval = 1440 + } + + streamer.UpdateTime = clog.UpdateTime + estore.UpdateStreamer(streamer) + counter.AddWithReset(1) } + + wd.Close() + wd.Quit() } func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") if err == nil { - if ltxt, err := label.Text(); err == nil { - log.Println("label:", ltxt) + if ltxt, err := label.Text(); err == nil && ltxt != "" { + // log.Println("label:", ltxt) streamer.UserName = sql.NullString{String: ltxt, Valid: true} return true, nil } } return false, err - }, 6*time.Second) + }, 15*time.Second) } func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { @@ -126,9 +153,9 @@ func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { followers = regexp.MustCompile(`[\d,]+`).FindString(followers) fint, _ := intimate.ParseNumber(followers) clog.Followers = sql.NullInt64{Int64: int64(fint), Valid: true} - log.Println("followers: ", followers, fint) + // log.Println("followers: ", followers, fint) return true, nil - }, 6*time.Second) + }, 4*time.Second) } func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { @@ -139,7 +166,7 @@ func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { vint, _ := intimate.ParseNumber(txt) clog.Views = sql.NullInt64{Int64: vint, Valid: true} - log.Println("views:", txt) + // log.Println("views:", txt) views.Click() extractTags(wd, clog) @@ -180,7 +207,6 @@ func extractTags(wd selenium.WebDriver, clog *intimate.CollectLog) error { } else { log.Println(err) } - log.Println(tag.Text()) } if len(stags) > 0 { if tagbuf, err := json.Marshal(stags); err == nil { diff --git a/extractor/twitch_extractor/twitch_test.go b/extractor/twitch_extractor/twitch_test.go index 884215e..731b2d3 100644 --- a/extractor/twitch_extractor/twitch_test.go +++ b/extractor/twitch_extractor/twitch_test.go @@ -1,13 +1,9 @@ package main import ( - "intimate" "testing" ) -var estore = intimate.NewStoreExtractor() -var sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) - func TestCase0(t *testing.T) { - + main() } diff --git a/extractor_field.go b/extractor_field.go index 0f94cd6..ad5ef30 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -19,11 +19,12 @@ type Streamer struct { UserName sql.NullString // LiveUrl sql.NullString // Channel sql.NullString // - Ext interface{} // + Tags interface{} + Ext interface{} // IsUpdateStreamer bool // 更新上面的内容 IsUpdateUrl bool - updateInterval int32 + UpdateInterval int32 UpdateUrl interface{} LatestLogUid int64 UpdateTime sql.NullTime // diff --git a/go.mod b/go.mod index 28e095a..d8b824d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module intimate go 1.14 require ( + github.com/474420502/focus v0.12.0 github.com/474420502/gcurl v0.1.2 github.com/474420502/hunter v0.3.4 github.com/474420502/requests v1.6.0 diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index f588757..91d3e3b 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS `streamer` ( `user_name` varchar(255) DEFAULT NULL COMMENT '用户名字 区别于ID', `live_url` text COMMENT '直播的url', `channel` varchar(128) DEFAULT NULL COMMENT'所属 频道,分类 未必所有平台都有明确的标签', + `tag` json DEFAULT NULL COMMENT 'streamer 最新的tag', `ext` json DEFAULT NULL COMMENT '扩展类型, 把一些可能需要但是没字段的数据放在json扩展', `is_update_streamer` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新streamer的信息. 1为需要,0则否', diff --git a/store.go b/store.go index 45f7f06..cbdbbfc 100644 --- a/store.go +++ b/store.go @@ -230,7 +230,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream return nil, err } var args = []interface{}{string(platform)} - selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` + selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) @@ -258,7 +258,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream s := &Streamer{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer) + err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer, &s.UpdateInterval) if err != nil { return nil, err } @@ -270,7 +270,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream // InsertStreamer Streamer表, 插入数据 func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { // select uid from table where platform = ? and user_id = ? - selectSQL := "SELECT is_update_url FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" + selectSQL := "SELECT is_update_url, uid FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" tx, err := store.db.Begin() if err != nil { panic(err) @@ -289,14 +289,16 @@ func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { row := tx.QueryRow(selectSQL+` LIMIT 1 FOR UPDATE`, streamer.Get("Platform"), streamer.Get("UserId")) var isUpdateUrl bool - if err = row.Scan(&isUpdateUrl); err == nil { + var Uid int64 + if err = row.Scan(&isUpdateUrl, &Uid); err == nil { if isUpdateUrl { tx.Exec("UPDATE "+StreamerTable+" SET update_url = ?", streamer.Get("UpdateUrl")) } + streamer.(ISet).Set("Uid", Uid) return true } - _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*30)) + _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*60)) if err != nil { panic(err) } @@ -331,16 +333,33 @@ func (store *StoreExtractor) UpdateOperator(isource IGet) { } // UpdateStreamer Streamer表, 插入数据 -func (store *StoreExtractor) UpdateStreamer(isource IGet) { - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?, operator = ?, update_time = ? WHERE uid = ?;", - isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("UpdateTime"), isource.Get("Uid")) +func (store *StoreExtractor) UpdateStreamer(streamer IGet) { + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?;", + streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) if err != nil { panic(err) } } -// InsertCollectLog CollectLog表插入数据 -func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { +// Update Streamer表, 更新指定的字段 +func (store *StoreExtractor) Update(streamer IGet, fieldvalues ...interface{}) { + updateSQL := "UPDATE " + StreamerTable + " SET " + var values []interface{} + for i := 0; i < len(fieldvalues); i += 2 { + field := fieldvalues[i] + values = append(values, fieldvalues[i+1]) + updateSQL += field.(string) + " = ? " + } + updateSQL += "WHERE uid = ?" + values = append(values, streamer.Get("Uid")) + _, err := store.db.Exec(updateSQL, values...) + if err != nil { + panic(err) + } +} + +// InsertClog CollectLog表插入数据 +func (store *StoreExtractor) InsertClog(clog IGet) int64 { tx, err := store.db.Begin() defer func() { @@ -355,7 +374,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { } result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), + clog.Get("StreamerUid"), clog.Get("Platform"), clog.Get("UserId"), clog.Get("IsLiveStreaming"), clog.Get("IsError"), clog.Get("Followers"), clog.Get("Views"), clog.Get("Giver"), clog.Get("Gratuity"), clog.Get("LiveTitle"), clog.Get("LiveStartTime"), clog.Get("LiveEndTime"), clog.Get("UpdateTime"), clog.Get("Tags"), clog.Get("Ext"), clog.Get("ErrorMsg"), ) if err != nil { panic(err) @@ -366,7 +385,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { panic(err) } - _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) + _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, clog.Get("StreamerUid")) if err = tx.Commit(); err != nil { panic(err) } diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 18b852d..2b385ed 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -25,7 +25,7 @@ type ChannelLink struct { func (cl *ChannelLink) Execute() { var err error wd := intimate.GetChromeDriver(3030) - defer wd.Close() + ps := intimate.NewPerfectShutdown() weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" err = wd.Get(weburl) @@ -40,7 +40,7 @@ func (cl *ChannelLink) Execute() { } return len(elements) > 0, nil } - wd.WaitWithTimeout(cardCondition, time.Second*30) + wd.WaitWithTimeout(cardCondition, time.Second*15) time.Sleep(time.Second) e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") @@ -68,15 +68,22 @@ func (cl *ChannelLink) Execute() { delayerror = 5 } - for _, card := range cards { - href, err := card.GetAttribute("href") - if err != nil { - log.Println(err) - } else { - hrefs[href] = true + for ii := 0; ii < 10; ii++ { + for _, card := range cards { + href, err := card.GetAttribute("href") + if err != nil { + log.Println(href, err) + continue + } else { + hrefs[href] = true + } } + break } samecount = len(cards) + if ps.IsClose() { + break + } if len(cards) > 10 { log.Println(len(cards)) @@ -88,7 +95,6 @@ func (cl *ChannelLink) Execute() { time.Sleep(time.Millisecond * 200) wd.KeyUp(selenium.EndKey) time.Sleep(time.Millisecond * 2500) - } for href := range hrefs { diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go index c64ef1d..52773de 100644 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -5,11 +5,7 @@ import ( "encoding/json" "intimate" "log" - "os" - "os/signal" "regexp" - "sync/atomic" - "syscall" "time" "github.com/tebeka/selenium" @@ -33,19 +29,18 @@ func (cl *UserList) Execute() { //article//a[@data-a-target='preview-card-title-link'] wd := intimate.GetChromeDriver(3030) - defer wd.Close() + ps := intimate.NewPerfectShutdown() + counter := intimate.NewCounter() + counter.SetMaxLimit(100) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) - var loop int32 = 1 - var count = 0 - - go func() { - signalchan := make(chan os.Signal) - signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) - log.Println("accept stop command:", <-signalchan) - atomic.StoreInt32(&loop, 0) - }() - - for atomic.LoadInt32(&loop) > 0 { + for !ps.IsClose() { var err error sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) @@ -56,7 +51,10 @@ func (cl *UserList) Execute() { weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT" err = wd.Get(weburl) if err != nil { - panic(err) + log.Println(err) + sstore.UpdateError(sourceChannel, err) + time.Sleep(time.Second * 10) + continue } wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { @@ -77,16 +75,17 @@ func (cl *UserList) Execute() { var elements []selenium.WebElement var liveurls = 0 var delayerror = 2 - for i := 0; i < 200 && atomic.LoadInt32(&loop) > 0; i++ { + for i := 0; i < 200 && !ps.IsClose(); i++ { elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") if err != nil { log.Println(err) break } - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 200) wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) wd.KeyUp(selenium.EndKey) - time.Sleep(time.Millisecond * 1500) + time.Sleep(time.Millisecond * 2000) if len(elements) == liveurls { delayerror-- if delayerror <= 0 { @@ -97,34 +96,62 @@ func (cl *UserList) Execute() { } liveurls = len(elements) } - elements, err = wd.FindElements(selenium.ByXPATH, "//article//a[@data-a-target='preview-card-title-link' and @href]") + articles, err := wd.FindElements(selenium.ByXPATH, "//article") if err != nil { log.Println(err) continue } - for _, e := range elements { + for _, article := range articles { - attr, err := e.GetAttribute("href") + e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") if err != nil { log.Println(err) continue } + + href, err := e.GetAttribute("href") + if err != nil { + log.Println(err) + continue + } + + btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") + if err != nil { + log.Println(err) + continue + } + + var tags []string + for _, btn := range btns { + tag, err := btn.GetAttribute("data-a-target") + if err == nil { + tags = append(tags, tag) + } + } + streamer := &intimate.Streamer{} - matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(attr) + matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) if len(matches) == 2 { streamer.UserId = matches[1] } else { - log.Println(attr) + log.Println(href) continue } + jtags, err := json.Marshal(tags) + if err != nil { + log.Println(err) + } else { + streamer.Tags = jtags + } + streamer.Platform = intimate.Ptwitch updateUrl := make(map[string]string) - updateUrl["live"] = attr - streamer.LiveUrl = sql.NullString{String: attr, Valid: true} + updateUrl["live"] = href + streamer.LiveUrl = sql.NullString{String: href, Valid: true} data, err := json.Marshal(updateUrl) if err != nil { log.Println(err) @@ -132,15 +159,19 @@ func (cl *UserList) Execute() { } streamer.UpdateUrl = data streamer.Operator = 0 - - estore.InsertStreamer(streamer) + if estore.InsertStreamer(streamer) { + // log.Println("streamer update tags", streamer.Uid, tags) + estore.Update(streamer, "Tags", streamer.Tags) + } } - log.Println("streamer insert", len(elements)) - count++ - if count >= 100 { - wd.Close() - wd = intimate.GetChromeDriver(3030) - count = 0 + log.Println("streamer find", len(articles)) + if len(articles) == 0 { + sourceChannel.Operator = 5 + sstore.UpdateOperator(sourceChannel) } + counter.AddWithReset(1) } + + wd.Close() + wd.Quit() } diff --git a/utils.go b/utils.go index 39f1d19..ebee59b 100644 --- a/utils.go +++ b/utils.go @@ -3,9 +3,13 @@ package intimate import ( "fmt" "log" + "os" + "os/signal" "runtime" "strconv" "strings" + "sync/atomic" + "syscall" "time" "github.com/tebeka/selenium" @@ -50,21 +54,44 @@ func ParseDuration(dt string) (time.Duration, error) { tdt, err := time.Parse("15:04:05", string(parse)) if err != nil { - return time.Duration(0), err } return tdt.Sub(zeroTime), nil } func GetChromeDriver(port int) selenium.WebDriver { + var err error caps := selenium.Capabilities{"browserName": "chrome"} + chromecaps := chrome.Capabilities{} - err := chromecaps.AddExtension("../../../crx/myblock.crx") - if err != nil { - panic(err) + for _, epath := range []string{"../../../crx/myblock.crx", "../../crx/myblock.crx"} { + _, err := os.Stat(epath) + if err == nil { + err := chromecaps.AddExtension(epath) + if err != nil { + panic(err) + } + break + } } + + if proxy := os.Getenv("chrome_proxy"); proxy != "" { + log.Println("proxy-server", proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-server="+proxy) + } + + if proxy := os.Getenv("pac_proxy"); proxy != "" { + log.Println("--proxy-pac-url=" + proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url="+proxy) + } + // chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url=http://127.0.0.1:1081/pac") chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/tmp/chromedriver-cache") + chromecaps.Args = append(chromecaps.Args, "--disable-gpu", "--disable-images", "--start-maximized", "--disable-infobars") + // chromecaps.Args = append(chromecaps.Args, "--headless") + chromecaps.Args = append(chromecaps.Args, "--no-sandbox") + chromecaps.Args = append(chromecaps.Args, "--disable-dev-shm-usage", "--mute-audio", "--safebrowsing-disable-auto-update") + chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") caps.AddChrome(chromecaps) _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", port) @@ -76,14 +103,137 @@ func GetChromeDriver(port int) selenium.WebDriver { panic(err) } runtime.SetFinalizer(wd, func(obj interface{}) { - log.Println(obj) + if err := obj.(selenium.WebDriver).Close(); err != nil { log.Println(err) } + if err := obj.(selenium.WebDriver).Quit(); err != nil { + log.Println(err) + } + }) wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) if err != nil { panic(err) } + return wd } + +// PerfectShutdown 完美关闭程序 +type PerfectShutdown struct { + loop int32 +} + +// NewPerfectShutdown 创建完美关闭程序 +func NewPerfectShutdown() *PerfectShutdown { + ps := &PerfectShutdown{} + ps.loop = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&ps.loop, 0) + }() + + return ps +} + +// IsClose 判断是否要关闭 +func (ps *PerfectShutdown) IsClose() bool { + return atomic.LoadInt32(&ps.loop) == 0 +} + +type Counter struct { + dcount int + count int + maxLimit int + minLimit int + + minobj []interface{} + maxobj []interface{} + maxLimitToDo func(obj ...interface{}) error + minLimitToDo func(obj ...interface{}) error +} + +func NewCounter() *Counter { + c := &Counter{} + return c +} + +// SetDefault 设置默认值 +func (c *Counter) SetDefault(n int) { + c.dcount = n +} + +// Reset 最置count为defaultCount值 +func (c *Counter) Reset() { + c.count = c.dcount +} + +// SetCount 设置count到最大值的时候执行do函数 +func (c *Counter) SetCount(count int) { + c.count = count +} + +// GetCount 设置count到最大值的时候执行do函数 +func (c *Counter) GetCount() int { + return c.count +} + +// SetMinLimit 设置最小限制 +func (c *Counter) SetMinLimit(n int) { + c.minLimit = n +} + +// SetMaxLimit 设置最大限制 +func (c *Counter) SetMaxLimit(n int) { + c.maxLimit = n +} + +// SetMaxToDo 设置count到最大值的时候执行do函数 +func (c *Counter) SetMaxToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.maxLimitToDo = do + c.maxobj = obj +} + +// SetMinToDo 设置count到最小值的时候执行do函数 +func (c *Counter) SetMinToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.minLimitToDo = do + c.minobj = obj +} + +// AddWithReset 操作 count 默认值为0, 当触发限制时, 重置为默认值 +func (c *Counter) AddWithReset(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + defer c.Reset() + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + defer c.Reset() + return c.minLimitToDo(c.minobj...) + } + } + return nil +} + +// Add 操作 count 默认值为0 +func (c *Counter) Add(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + return c.minLimitToDo(c.minobj...) + } + } + return nil +} diff --git a/xvfb.sh b/xvfb.sh new file mode 100644 index 0000000..d5f6036 --- /dev/null +++ b/xvfb.sh @@ -0,0 +1 @@ +screen -dmS xvfb-99 Xvfb :99 -screen 0 1280x720x24 -ac -nolisten tcp -dpi 96 +extension RANDR -nolisten tcp