package main import ( "database/sql" "encoding/json" "intimate" "log" "time" "github.com/474420502/gcurl" "github.com/474420502/requests" "github.com/tidwall/gjson" ) // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) // estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func init() { } // Execute 执行方法 func Execute() { ps := intimate.NewPerfectShutdown() ses := requests.NewSession() var lasterr error = nil for !ps.IsClose() { streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if streamer == nil || err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } time.Sleep(time.Second * 2) continue } userId := streamer.UserId var updateUrl map[string]string err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue } // Check Userid userUrl := updateUrl["user"] log.Println(userUrl) tp := ses.Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } cookies := ses.GetCookies(tp.GetParsedURL()) scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() for _, cookie := range cookies { if cookie.Name == "uuid" { supportersQuery.Set("Uuid", cookie.Value) continue } if cookie.Name == "token" { supportersQuery.Set("Token", cookie.Value) continue } if cookie.Name == "random" { supportersQuery.Set("Random", cookie.Value) continue } } supportersQuery.Set("identify_id", userId) temporary.SetQuery(supportersQuery) resp, err := temporary.Execute() if err != nil { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) temporary.QueryParam("page_number").IntAdd(1) // page := supportersQuery.Get("page_number") // page_number 加1 // pageint, err := strconv.Atoi(page) // if err != nil { // log.Println(err) // break // } // pageint++ // page = strconv.Itoa(pageint) // supportersQuery.Set("page_number", page) // temporary.SetQuery(supportersQuery) } // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) ext := make(map[string]interface{}) ext["json_supporters"] = supporters ext["html_user"] = string(resp.Content()) liveUrl := updateUrl["live"] tp = ses.Get(liveUrl) resp, err = tp.Execute() if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } ext["html_live"] = string(resp.Content()) ext["var_user_id"] = userId extJsonBytes, err := json.Marshal(ext) if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } // streamer.Platform = intimate.Popenrec streamer.UpdateInterval = 120 streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} streamer.Operator = 0 source := &intimate.Source{} source.Target = intimate.TOpenrecUser source.Ext = string(extJsonBytes) source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) estore.UpdateStreamer(streamer) } }