TODO: twitch_task2 fix 错误

This commit is contained in:
eson 2020-09-08 18:24:51 +08:00
parent 28319bf02a
commit fb07d61353
14 changed files with 495 additions and 433 deletions

View File

@ -28,6 +28,7 @@ type Store struct {
db *sql.DB db *sql.DB
} }
// Table 表
type Table struct { type Table struct {
store *Store store *Store
name string name string
@ -38,8 +39,6 @@ type Table struct {
insertsql string insertsql string
} }
// const updatesql = "UPDATE %s SET %s WHERE %s = ?"
func NewStore(uri string) *Store { func NewStore(uri string) *Store {
db, err := sql.Open("mysql", uri) db, err := sql.Open("mysql", uri)
if err != nil { if err != nil {
@ -56,19 +55,26 @@ func (store *Store) Table(name string) *Table {
table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)`
table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?`
// table.selectsql = `FROM ` + table.name + `WHERE operator`
table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s ` table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s `
return table return table
} }
// Queue mysql 队列结构 // Queue mysql 队列结构
type Queue struct { type Queue struct {
table *Table table *Table
obj reflect.Type obj reflect.Type
selected string fieldIndex []int
condition string selected string
uidname string
uididx int cond CondWhere
uidname string
uididx int
}
type CondWhere struct {
Condition string
CondArgs []interface{}
} }
// OperatorType 字典Operator 标志位的类型 // OperatorType 字典Operator 标志位的类型
@ -83,12 +89,21 @@ const (
OpERROR OperatorType = "10000" OpERROR OperatorType = "10000"
) )
// ConditionDefault 默认的条件
func ConditionDefault(platform Platform) CondWhere {
return CondWhere{
Condition: "platform = ? and operator = 0 and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval",
CondArgs: []interface{}{string(platform)},
}
}
// Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件 // Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件
func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { func (t *Table) Queue(obj interface{}, whereCondition CondWhere) *Queue {
q := &Queue{} q := &Queue{}
q.condition = whereCondition q.cond = whereCondition
q.obj = reflect.TypeOf(obj) q.obj = reflect.TypeOf(obj)
q.table = t q.table = t
q.fieldIndex = []int{} // select 需要配对字段变量的对应index位置
for i := 0; i < q.obj.NumField(); i++ { for i := 0; i < q.obj.NumField(); i++ {
field := q.obj.Field(i) field := q.obj.Field(i)
@ -98,6 +113,7 @@ func (t *Table) Queue(obj interface{}, whereCondition string) *Queue {
q.uididx = i q.uididx = i
q.uidname = fname q.uidname = fname
} }
q.fieldIndex = append(q.fieldIndex, i)
} }
} }
@ -123,14 +139,14 @@ func (queue *Queue) Pop() (result interface{}, err error) {
} }
}() }()
selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.cond.Condition + " limit 1 for update"
rows, err := tx.Query(selectsql) rows, err := tx.Query(selectsql, queue.cond.CondArgs...)
if err != nil { if err != nil {
return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) return nil, fmt.Errorf("table: %s queue is empty", queue.table.name)
} }
var fields = make([]interface{}, queue.obj.NumField()) var fields = make([]interface{}, len(queue.fieldIndex))
for i := range fields { for i := range fields {
var iv interface{} var iv interface{}
fields[i] = &iv fields[i] = &iv
@ -159,15 +175,15 @@ func (queue *Queue) Pop() (result interface{}, err error) {
} }
obj := reflect.New(queue.obj).Elem() obj := reflect.New(queue.obj).Elem()
for i := 0; i < obj.NumField(); i++ { for i, idx := range queue.fieldIndex {
field := obj.Field(i) field := obj.Field(idx)
convert(*fields[i].(*interface{}), field, columntypes[i]) convert(*fields[i].(*interface{}), field, columntypes[i])
} }
return obj.Addr().Interface(), err return obj.Addr().Interface(), err
} }
// Insert nil 不插入. 不支持嵌套. // Insert nil 不插入. 不支持嵌套. 必须是Ptr类型
func (t *Table) Insert(obj interface{}) error { func (t *Table) Insert(obj interface{}) error {
ov := reflect.ValueOf(obj).Elem() ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj) ot := reflect.TypeOf(obj)
@ -210,6 +226,52 @@ func (t *Table) Insert(obj interface{}) error {
return err return err
} }
// InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid
func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
argssql := ""
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if flag, ok := ftype.Tag.Lookup("uid"); ok {
if flag == "auto" {
continue
}
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + ","
argssql += "?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + ","
argssql += "?,"
}
}
}
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
result, err := t.store.db.Exec(ssql, args...)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// Update 结构体更新 // Update 结构体更新
func (t *Table) Update(obj interface{}) error { func (t *Table) Update(obj interface{}) error {
@ -261,7 +323,40 @@ func (t *Table) Update(obj interface{}) error {
return err return err
} }
// UpdateError 更新错误数据
func (t *Table) UpdateError(obj interface{}, err error) {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
var uidname string
var uidvalue interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if _, ok := ftype.Tag.Lookup("uid"); ok {
if uidvalue != nil {
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
}
uidname = fname
uidvalue = field.Interface()
break
}
}
}
_, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where ? = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidname, uidvalue)
if dberr != nil {
// email tell owner to deal with
panic(err)
}
}
func assign(field reflect.Value, src interface{}) (bool, error) { func assign(field reflect.Value, src interface{}) (bool, error) {
switch field.Kind() { switch field.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
s := asString(src) s := asString(src)

View File

@ -13,7 +13,7 @@ func estAutoStore(t *testing.T) {
uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci"
store := NewStore(uri) store := NewStore(uri)
queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0") queue := store.Table("streamer").Queue(TSreamer{}, CondWhere{Condition: "operator = 0"})
re, _ := queue.Pop() re, _ := queue.Pop()
pstreamer := re.(*TSreamer) pstreamer := re.(*TSreamer)

View File

@ -9,12 +9,11 @@ import (
"time" "time"
"github.com/474420502/extractor" "github.com/474420502/extractor"
"github.com/474420502/gcurl"
"github.com/474420502/requests"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
var estore = intimate.NewStoreExtractor()
var sstore = intimate.NewStoreSource(string(intimate.STOpenrec))
//UserInfo 提取信息的结构体 //UserInfo 提取信息的结构体
type UserInfo struct { type UserInfo struct {
UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"` UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"`
@ -34,135 +33,230 @@ type UserLive struct {
func Execute() { func Execute() {
ps := intimate.NewPerfectShutdown() ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Popenrec))
var lasterr error = nil var lasterr error = nil
for !ps.IsClose() { for !ps.IsClose() {
var err error istreamer, err := squeue.Pop()
source, err := sstore.Pop(intimate.TOpenrecUser, 0) // streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析
if err != nil {
if istreamer == nil || err != nil {
if err != lasterr { if err != lasterr {
log.Println(err, lasterr) log.Println(err, lasterr)
lasterr = err lasterr = err
} }
time.Sleep(time.Second * 5) time.Sleep(time.Second * 2)
continue continue
} }
lasterr = nil streamer := istreamer.(*intimate.Streamer)
sdata := source.Ext.([]byte) userId := *streamer.UserId
datamap := gjson.ParseBytes(sdata).Map()
source.Operator = int32(intimate.OperatorError) var updateUrl map[string]string
userId := datamap["var_user_id"].String() err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url
if err != nil {
log.Println(err)
continue
}
// Check Userid
streamer := &intimate.Streamer{} userUrl := updateUrl["user"]
streamer.UserId = userId log.Println(userUrl)
// streamer.Platform = intimate.Popenrec 不需要更新字段 tp := ses.Get(userUrl) // 获取user url页面数据
resp, err := tp.Execute()
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
htmlUser := datamap["html_user"] if err != nil {
log.Println(err)
userEtor := extractor.ExtractHtmlString(htmlUser.String()) intimate.TStreamer.UpdateError(streamer, err)
ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo)
htmlLive := datamap["html_live"]
liveEtor := extractor.ExtractHtmlString(htmlLive.String())
ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive)
jsonSupporters := datamap["json_supporters"]
clog := &intimate.CollectLog{}
if ok1 {
clog.Followers = sql.NullInt64{Int64: ui.Followers, Valid: true}
clog.Views = sql.NullInt64{Int64: ui.Views, Valid: true}
if ui.Views != 0 {
clog.IsLiveStreaming = true
}
streamer.UserName = sql.NullString{String: ui.UserName, Valid: true}
giverjson := jsonSupporters
var givers []interface{}
var gratuity int64 = 0
for _, v := range giverjson.Array() {
giverSource := gjson.Parse(v.String())
for _, item := range giverSource.Get("data.items").Array() {
givers = append(givers, item.Map())
gratuity += item.Get("total_yells").Int()
}
}
giversbytes, err := json.Marshal(givers)
if err != nil {
log.Println(err)
clog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
} else {
clog.Giver = giversbytes
}
clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true}
} else {
log.Println("UserInfo may be not exists")
estore.UpdateError(streamer, errors.New("UserInfo may be not exists"))
continue continue
} }
//log.Println(ul) cookies := ses.GetCookies(tp.GetParsedURL())
if ok2 {
clog.LiveTitle = sql.NullString{String: ul.Title, Valid: true}
startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local) scurl := updateUrl["supporters"] //获取打赏者的数据
if err != nil { curl := gcurl.Parse(scurl)
log.Println(err) supportersSession := curl.CreateSession()
} else {
clog.LiveStartTime = sql.NullTime{Time: startTime.Local(), Valid: true} temporary := curl.CreateTemporary(supportersSession)
duration, err := intimate.ParseDuration(ul.LiveEndTime) supportersSession.SetCookies(temporary.GetParsedURL(), cookies)
if err != nil { var supporters []string
log.Println(err) for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码
} else {
endTime := startTime.Add(duration) supportersQuery := temporary.GetQuery()
clog.LiveStartTime = sql.NullTime{Time: endTime.Local(), Valid: true}
for _, cookie := range cookies {
if cookie.Name == "uuid" {
supportersQuery.Set("Uuid", cookie.Value)
continue
}
if cookie.Name == "token" {
supportersQuery.Set("Token", cookie.Value)
continue
}
if cookie.Name == "random" {
supportersQuery.Set("Random", cookie.Value)
continue
} }
} }
if tags, err := json.Marshal(ul.Tags); err == nil { supportersQuery.Set("identify_id", userId)
clog.Tags = tags temporary.SetQuery(supportersQuery)
} else {
log.Println("json error", ul.Tags, clog.Tags) resp, err := temporary.Execute()
if err != nil {
log.Println(err)
} }
supporterjson := gjson.ParseBytes(resp.Content())
supporterdata := supporterjson.Get("data") //解析supporters获取的json数据
if supporterdata.Type == gjson.Null {
break
}
supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1)
} }
streamer.Uid = source.StreamerId.Int64 // cookies := cxt.Session().GetCookies(wf.GetParsedURL())
streamer.UpdateTime = source.UpdateTime // ext := make(map[string]interface{})
if clog.Tags != nil {
streamer.Tags = clog.Tags jsonSupporters := supporters
htmlUser := string(resp.Content())
liveUrl := updateUrl["live"]
tp = ses.Get(liveUrl)
resp, err = tp.Execute()
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
continue
} }
clog.Platform = intimate.Popenrec htmlLive := string(resp.Content())
clog.UserId = userId // ext["var_user_id"] = userId
clog.UpdateTime = source.UpdateTime
clog.StreamerUid = streamer.Uid
logUid := estore.InsertClog(clog) // streamer.Platform = intimate.Popenrec
streamer.UpdateInterval = 120
LiveUrl := "https://www.openrec.tv/live/" + userId streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
streamer.Operator = 0
streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} Extractor(streamer, userId, htmlUser, htmlLive, jsonSupporters)
streamer.LatestLogUid = logUid
// streamer.Operator = 0
log.Println(streamer.UserId)
estore.Update(streamer,
"user_name", streamer.UserName,
"user_id", streamer.UserId,
"live_url", streamer.LiveUrl,
"latest_log_uid", streamer.LatestLogUid,
"update_time", streamer.UpdateTime,
"tags", streamer.Tags,
)
source.Operator = int32(intimate.OperatorExtractorOK)
sstore.UpdateOperator(source)
} }
} }
func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive string, jsonSupporters []string) {
// sdata := source.Ext.([]byte)
// datamap := gjson.ParseBytes(sdata).Map()
// userId := datamap["var_user_id"].String()
// streamer := &intimate.Streamer{}
// streamer.UserId = &userId
// streamer.Platform = intimate.Popenrec 不需要更新字段
// htmlUser := datamap["html_user"]
userEtor := extractor.ExtractHtmlString(htmlUser)
ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo)
// htmlLive := datamap["html_live"]
liveEtor := extractor.ExtractHtmlString(htmlLive)
ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive)
// jsonSupporters := datamap["json_supporters"]
clog := &intimate.CollectLog{}
if ok1 {
clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true}
clog.Views = &sql.NullInt64{Int64: ui.Views, Valid: true}
if ui.Views != 0 {
clog.IsLiveStreaming = true
}
streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true}
// giverjson := jsonSupporters
var givers []interface{}
var gratuity int64 = 0
for _, v := range jsonSupporters {
giverSource := gjson.Parse(v)
for _, item := range giverSource.Get("data.items").Array() {
givers = append(givers, item.Map())
gratuity += item.Get("total_yells").Int()
}
}
giversbytes, err := json.Marshal(givers)
if err != nil {
log.Println(err)
clog.ErrorMsg = &sql.NullString{String: err.Error(), Valid: true}
} else {
clog.Giver = giversbytes
}
clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true}
} else {
log.Println("UserInfo may be not exists")
intimate.TStreamer.UpdateError(streamer, errors.New("UserInfo may be not exists"))
return
}
//log.Println(ul)
if ok2 {
clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true}
startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local)
if err != nil {
log.Println(err)
} else {
clog.LiveStartTime = &sql.NullTime{Time: startTime.Local(), Valid: true}
duration, err := intimate.ParseDuration(ul.LiveEndTime)
if err != nil {
log.Println(err)
} else {
endTime := startTime.Add(duration)
clog.LiveEndTime = &sql.NullTime{Time: endTime.Local(), Valid: true}
}
}
if tags, err := json.Marshal(ul.Tags); err == nil {
clog.Tags = tags
} else {
log.Println("json error", ul.Tags, clog.Tags)
}
}
// streamer.Uid = source.StreamerId.Int64
// streamer.UpdateTime = &source.UpdateTime
if clog.Tags != nil {
streamer.Tags = clog.Tags
}
clog.Platform = intimate.Popenrec
clog.UserId = userId
clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
clog.StreamerUid = streamer.Uid
logUid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
log.Println(err)
return
}
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = &sql.NullString{String: LiveUrl, Valid: true}
streamer.LatestLogUid = logUid
// streamer.Operator = 0
// log.Println(*streamer.UserId)
intimate.TStreamer.Update(streamer)
// source.Operator = int32(intimate.OperatorExtractorOK)
// sstore.UpdateOperator(source)
}

View File

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"intimate" "intimate"
"log" "log"
"os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -34,26 +33,35 @@ func main() {
ps := intimate.NewPerfectShutdown() ps := intimate.NewPerfectShutdown()
ses := requests.NewSession() ses := requests.NewSession()
streamerQueue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitcasting))
for !ps.IsClose() { for !ps.IsClose() {
streamer, err := estore.Pop(intimate.Ptwitcasting) // streamer, err := estore.Pop(intimate.Ptwitcasting)
isteamer, err := streamerQueue.Pop()
if err != nil { if err != nil {
log.Println(err, streamer) log.Println(err, isteamer)
continue
} }
streamer.LiveUrl = sql.NullString{String: "https://twitcasting.tv/" + streamer.UserId, Valid: true} streamer := isteamer.(*intimate.Streamer)
streamer.LiveUrl = &sql.NullString{String: "https://twitcasting.tv/" + *streamer.UserId, Valid: true}
resp, err := ses.Get(streamer.LiveUrl.String).Execute() resp, err := ses.Get(streamer.LiveUrl.String).Execute()
if err != nil { if err != nil {
estore.UpdateError(streamer, err) intimate.TStreamer.UpdateError(streamer, err)
log.Println(err, streamer.UserId) log.Println(err, *streamer.UserId)
continue continue
} }
var ldata *LiveData var ldata *LiveData
f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) // f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
f.Write(resp.Content()) // f.Write(resp.Content())
etor := extractor.ExtractHtml(resp.Content()) etor := extractor.ExtractHtml(resp.Content())
ldata = etor.GetObjectByTag(LiveData{}).(*LiveData) ildata := etor.GetObjectByTag(LiveData{})
if ildata == nil {
log.Println(streamer.LiveUrl.String)
continue
}
ldata = ildata.(*LiveData)
// ldata.MaxViews = regexp.MustCompile("\\d+").FindString(ldata.MaxViews) // ldata.MaxViews = regexp.MustCompile("\\d+").FindString(ldata.MaxViews)
coincount := 0 coincount := 0
@ -62,14 +70,14 @@ func main() {
giverurl := streamer.LiveUrl.String + "/backers/" + strconv.Itoa(i) giverurl := streamer.LiveUrl.String + "/backers/" + strconv.Itoa(i)
resp, err = ses.Get(giverurl).Execute() resp, err = ses.Get(giverurl).Execute()
if err != nil { if err != nil {
estore.UpdateError(streamer, err) intimate.TStreamer.UpdateError(streamer, err)
log.Panic(err) log.Panic(err)
} }
etor := extractor.ExtractHtml(resp.Content()) etor := extractor.ExtractHtml(resp.Content())
xp, err := etor.XPaths("//td[@class='tw-memorial-table-recent-point']") xp, err := etor.XPaths("//td[@class='tw-memorial-table-recent-point']")
if err != nil { if err != nil {
estore.UpdateError(streamer, err) intimate.TStreamer.UpdateError(streamer, err)
log.Panic(err) log.Panic(err)
} }
@ -100,20 +108,20 @@ func main() {
} }
streamer.Platform = intimate.Ptwitcasting streamer.Platform = intimate.Ptwitcasting
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
streamer.UserName = sql.NullString{String: ldata.UserName, Valid: true} streamer.UserName = &sql.NullString{String: ldata.UserName, Valid: true}
streamer.Operator = 0 streamer.Operator = 0
streamer.Tags = tags streamer.Tags = tags
// streamer.UpdateInterval = 60 // streamer.UpdateInterval = 60
clog := &intimate.CollectLog{} clog := &intimate.CollectLog{}
clog.UserId = streamer.UserId clog.UserId = *streamer.UserId
clog.Gratuity = sql.NullInt64{Int64: int64(coincount), Valid: true} clog.Gratuity = &sql.NullInt64{Int64: int64(coincount), Valid: true}
clog.Platform = streamer.Platform clog.Platform = streamer.Platform
clog.UpdateTime = streamer.UpdateTime clog.UpdateTime = streamer.UpdateTime
clog.LiveTitle = sql.NullString{String: ldata.LiveTitle, Valid: true} clog.LiveTitle = &sql.NullString{String: ldata.LiveTitle, Valid: true}
clog.Tags = tags clog.Tags = tags
clog.Followers = sql.NullInt64{Int64: int64(ldata.Follower), Valid: true} clog.Followers = &sql.NullInt64{Int64: int64(ldata.Follower), Valid: true}
switch { switch {
case ldata.Follower <= 100: case ldata.Follower <= 100:
streamer.UpdateInterval = 720 streamer.UpdateInterval = 720
@ -125,12 +133,12 @@ func main() {
streamer.UpdateInterval = 120 streamer.UpdateInterval = 120
} }
clog.Views = sql.NullInt64{Int64: ldata.MaxViews, Valid: true} clog.Views = &sql.NullInt64{Int64: ldata.MaxViews, Valid: true}
if ldata.LiveStart != "" { if ldata.LiveStart != "" {
st, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700", ldata.LiveStart) st, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700", ldata.LiveStart)
if err == nil { if err == nil {
startTime := st startTime := st
clog.LiveStartTime = sql.NullTime{Time: startTime, Valid: true} clog.LiveStartTime = &sql.NullTime{Time: startTime, Valid: true}
dt, err := strconv.Atoi(ldata.LiveDuration) dt, err := strconv.Atoi(ldata.LiveDuration)
liveduration := time.Now().Sub(startTime) liveduration := time.Now().Sub(startTime)
@ -149,7 +157,7 @@ func main() {
if err == nil { if err == nil {
endTime := startTime.Add((time.Duration)(dt) * time.Millisecond) endTime := startTime.Add((time.Duration)(dt) * time.Millisecond)
clog.LiveEndTime = sql.NullTime{Time: endTime, Valid: true} clog.LiveEndTime = &sql.NullTime{Time: endTime, Valid: true}
} else { } else {
log.Println(err, streamer.UserId) log.Println(err, streamer.UserId)
} }
@ -158,8 +166,16 @@ func main() {
} }
} }
streamer.LatestLogUid = estore.InsertClog(clog) clog.StreamerUid = streamer.Uid
estore.UpdateStreamer(streamer) uid, err := intimate.TClog.InsertRetAutoID(clog)
log.Println(streamer.UserId) if err != nil {
log.Println(err)
continue
}
streamer.LatestLogUid = uid
intimate.TStreamer.Update(streamer)
// estore.UpdateStreamer(streamer)
log.Println(*streamer.UserId)
} }
} }

View File

@ -3,26 +3,25 @@ package intimate
import ( import (
"database/sql" "database/sql"
"reflect" "reflect"
"time"
) )
type GetSet struct { type GetSet struct {
} }
type StreamerList struct { type StreamerList struct {
UrlHash []byte // UrlHash string `field:"urlhash" ` //
Platform Platform // Platform string `field:"platform" ` //
Url string // Url string `field:"url" ` //
Label sql.NullString // Label *sql.NullString `field:"label" ` //
Serialize interface{} Serialize interface{} `field:"serialize" `
UpdateInterval int32 UpdateInterval int32 `field:"update_interval" `
UpdateTime time.Time // UpdateTime *sql.NullTime `field:"update_time" ` //
ErrorMsg sql.NullString ErrorMsg *sql.NullString `field:"error_msg" ` //
Operator int32 Operator int32 `field:"operator" `
LastOperator int32 LastOperator int32
} }
@ -51,7 +50,7 @@ type Streamer struct {
IsUpdateStreamer bool // 更新上面的内容 IsUpdateStreamer bool // 更新上面的内容
IsUpdateUrl bool IsUpdateUrl bool
UpdateInterval int32 `field:"update_interval"` UpdateInterval int32 `field:"update_interval"`
UpdateUrl interface{} `field:"update_url"` UpdateUrl interface{} `field:"update_url"` // TODO: nil
LatestLogUid int64 `field:"latest_log_uid"` LatestLogUid int64 `field:"latest_log_uid"`
UpdateTime *sql.NullTime `field:"update_time"` // UpdateTime *sql.NullTime `field:"update_time"` //
@ -72,24 +71,24 @@ func (ai *Streamer) Set(field string, value interface{}) {
} }
type CollectLog struct { type CollectLog struct {
LogUid int64 // 日志id LogUid int64 `field:"log_uid"` // 日志id
StreamerUid int64 // StreamerId 表id与 StreamerUid int64 `field:"streamer_uid"` // StreamerId 表id与
Platform Platform // Platform Platform `field:"platform"` //
UserId string // 平台的UserId UserId string `field:"user_id"` // 平台的UserId
IsLiveStreaming bool // IsLiveStreaming bool `field:"is_live_streaming"` //
IsError bool // IsError bool `field:"is_error"` //
Followers sql.NullInt64 // Followers *sql.NullInt64 `field:"followers"` //
Views sql.NullInt64 // Views *sql.NullInt64 `field:"views"` //
Giver interface{} // Giver interface{} `field:"giver"` //
Gratuity sql.NullInt64 // Gratuity *sql.NullInt64 `field:"gratuity"` //
LiveTitle sql.NullString // LiveTitle *sql.NullString `field:"live_title"` //
LiveStartTime sql.NullTime // LiveStartTime *sql.NullTime `field:"live_start_time"` //
LiveEndTime sql.NullTime // LiveEndTime *sql.NullTime `field:"live_end_time"` //
UpdateTime sql.NullTime // UpdateTime *sql.NullTime `field:"update_time"` //
Tags interface{} Tags interface{} `field:"tags"`
Ext interface{} // Ext interface{} `field:"ext"` //
ErrorMsg sql.NullString // ErrorMsg *sql.NullString `field:"error_msg"` //
} }
// Get Simple Value // Get Simple Value

View File

@ -70,13 +70,13 @@ func Execute() {
if userid := room.Get("id").String(); userid != "" { if userid := room.Get("id").String(); userid != "" {
streamer.UserId = userid streamer.UserId = &userid
streamer.LiveUrl = sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true} streamer.LiveUrl = &sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true}
channel := room.Get("roomTypeName").String() channel := room.Get("roomTypeName").String()
streamer.Channel = sql.NullString{String: channel, Valid: channel != ""} streamer.Channel = &sql.NullString{String: channel, Valid: channel != ""}
username := room.Get("anchorName").String() username := room.Get("anchorName").String()
streamer.UserName = sql.NullString{String: username, Valid: username != ""} streamer.UserName = &sql.NullString{String: username, Valid: username != ""}
if rtags := room.Get("anchorLabels"); rtags.IsArray() { if rtags := room.Get("anchorLabels"); rtags.IsArray() {

View File

@ -11,11 +11,11 @@ import (
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql // // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) // var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec))
// estore 解析存储连接实例 // // estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// Execute 执行方法 // Execute 执行方法
func Execute() { func Execute() {
@ -71,7 +71,7 @@ func Execute() {
userid := User.Get("channel.id").String() userid := User.Get("channel.id").String()
streamer := &intimate.Streamer{} streamer := &intimate.Streamer{}
streamer.UserId = userid streamer.UserId = &userid
streamer.Platform = intimate.Popenrec streamer.Platform = intimate.Popenrec
updateUrl := make(map[string]interface{}) updateUrl := make(map[string]interface{})
@ -83,15 +83,16 @@ func Execute() {
updateUrlBytes, err := json.Marshal(updateUrl) updateUrlBytes, err := json.Marshal(updateUrl)
if err != nil { if err != nil {
estore.UpdateError(streamer, err) intimate.TStreamer.UpdateError(streamer, err)
continue continue
} }
streamer.UpdateUrl = updateUrlBytes streamer.UpdateUrl = updateUrlBytes
estore.InsertStreamer(streamer) intimate.TStreamer.Insert(streamer)
} }
} }
log.Println("streamer count:", len(result.Array()), tp.ParsedURL.String())
// 修改url query 参数的page递增. 遍历所有页面 // 修改url query 参数的page递增. 遍历所有页面
tp.QueryParam("page").IntAdd(1) tp.QueryParam("page").IntAdd(1)
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)

View File

@ -1,2 +0,0 @@
openrec_task2
log

View File

@ -1,5 +0,0 @@
package main
func main() {
Execute()
}

View File

@ -1,154 +0,0 @@
package main
import (
"database/sql"
"encoding/json"
"intimate"
"log"
"time"
"github.com/474420502/gcurl"
"github.com/474420502/requests"
"github.com/tidwall/gjson"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec))
// estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
func init() {
}
// Execute 执行方法
func Execute() {
ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
var lasterr error = nil
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析
if streamer == nil || err != nil {
if err != lasterr {
log.Println(err, lasterr)
lasterr = err
}
time.Sleep(time.Second * 2)
continue
}
userId := streamer.UserId
var updateUrl map[string]string
err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url
if err != nil {
log.Println(err)
continue
}
// Check Userid
userUrl := updateUrl["user"]
log.Println(userUrl)
tp := ses.Get(userUrl) // 获取user url页面数据
resp, err := tp.Execute()
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
cookies := ses.GetCookies(tp.GetParsedURL())
scurl := updateUrl["supporters"] //获取打赏者的数据
curl := gcurl.Parse(scurl)
supportersSession := curl.CreateSession()
temporary := curl.CreateTemporary(supportersSession)
supportersSession.SetCookies(temporary.GetParsedURL(), cookies)
var supporters []string
for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码
supportersQuery := temporary.GetQuery()
for _, cookie := range cookies {
if cookie.Name == "uuid" {
supportersQuery.Set("Uuid", cookie.Value)
continue
}
if cookie.Name == "token" {
supportersQuery.Set("Token", cookie.Value)
continue
}
if cookie.Name == "random" {
supportersQuery.Set("Random", cookie.Value)
continue
}
}
supportersQuery.Set("identify_id", userId)
temporary.SetQuery(supportersQuery)
resp, err := temporary.Execute()
if err != nil {
log.Println(err)
}
supporterjson := gjson.ParseBytes(resp.Content())
supporterdata := supporterjson.Get("data") //解析supporters获取的json数据
if supporterdata.Type == gjson.Null {
break
}
supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1)
}
// cookies := cxt.Session().GetCookies(wf.GetParsedURL())
ext := make(map[string]interface{})
ext["json_supporters"] = supporters
ext["html_user"] = string(resp.Content())
liveUrl := updateUrl["live"]
tp = ses.Get(liveUrl)
resp, err = tp.Execute()
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
ext["html_live"] = string(resp.Content())
ext["var_user_id"] = userId
extJsonBytes, err := json.Marshal(ext)
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
// streamer.Platform = intimate.Popenrec
streamer.UpdateInterval = 120
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
streamer.Operator = 0
source := &intimate.Source{}
source.Target = intimate.TOpenrecUser
source.Ext = string(extJsonBytes)
source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true}
sstore.Insert(source)
estore.UpdateStreamer(streamer)
}
}

View File

@ -1,9 +0,0 @@
package main
import (
"testing"
)
func TestMain(t *testing.T) {
main()
}

View File

@ -75,8 +75,10 @@ func Execute() {
sl.Operator = 0 sl.Operator = 0
sl.UpdateInterval = 120 sl.UpdateInterval = 120
sl.UpdateTime = time.Now() sl.UpdateTime = time.Now()
sl.UrlHash = intimate.GetUrlHash(sl.Url)
estore.InsertStreamerList(sl) intimate.TStreamerList.Insert(sl)
// estore.InsertStreamerList(sl)
queue.Put(wurl) queue.Put(wurl)
queuedict[wurl] = true queuedict[wurl] = true
@ -107,7 +109,8 @@ func Execute() {
sl.Operator = 0 sl.Operator = 0
sl.UpdateInterval = 120 sl.UpdateInterval = 120
sl.UpdateTime = time.Now() sl.UpdateTime = time.Now()
estore.InsertStreamerList(sl) sl.UrlHash = intimate.GetUrlHash(sl.Url)
intimate.TStreamerList.Insert(sl)
queue.Put(wurl) queue.Put(wurl)
queuedict[wurl] = true queuedict[wurl] = true

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"database/sql"
"intimate" "intimate"
"log" "log"
"time" "time"
@ -9,100 +8,120 @@ import (
"github.com/tebeka/selenium" "github.com/tebeka/selenium"
) )
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql // // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) // var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// estore 解析存储连接实例 // // estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// 获取类型的所有频道链接 // 获取类型的所有频道链接
// Execute 执行任务 // Execute 执行任务
func Execute() { func Execute() {
var err error
wd := intimate.GetChromeDriver(3030)
ps := intimate.NewPerfectShutdown() ps := intimate.NewPerfectShutdown()
weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" for !ps.IsClose() {
err = wd.Get(weburl) var err error
if err != nil { wd := intimate.GetChromeDriver(3030)
panic(err)
}
cardCondition := func(wd selenium.WebDriver) (bool, error) { weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT"
elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") err = wd.Get(weburl)
if err != nil { if err != nil {
return false, err panic(err)
}
return len(elements) > 0, nil
}
wd.WaitWithTimeout(cardCondition, time.Second*15)
time.Sleep(time.Second)
e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
panic(err)
}
e.Click()
var hrefs map[string]bool = make(map[string]bool)
var delayerror = 5
for i := 0; i <= 200; i++ {
cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
log.Println(err)
break
} }
if len(hrefs) == 0 { cardCondition := func(wd selenium.WebDriver) (bool, error) {
delayerror-- elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if delayerror <= 0 { if err != nil {
return false, err
}
return len(elements) > 0, nil
}
wd.WaitWithTimeout(cardCondition, time.Second*15)
time.Sleep(time.Second)
e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
panic(err)
}
e.Click()
var lasthreflen = 0
var hrefs map[string]bool = make(map[string]bool)
var delayerror = 5
for i := 0; i <= 200; i++ {
cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
log.Println(err)
break break
} }
} else {
delayerror = 5
}
for ii := 0; ii < 10; ii++ { if len(hrefs) == lasthreflen {
for _, card := range cards { delayerror--
href, err := card.GetAttribute("href") if delayerror <= 0 {
if err != nil { break
log.Println(href, err)
continue
} else {
hrefs[href] = true
} }
} else {
delayerror = 7
} }
break lasthreflen = len(hrefs)
}
if ps.IsClose() { for ii := 0; ii < 10; ii++ {
break for _, card := range cards {
} href, err := card.GetAttribute("href")
if err != nil {
log.Println(href, err)
continue
} else {
hrefs[href] = true
}
}
break
}
if len(cards) > 10 { if ps.IsClose() {
log.Println(len(cards)) break
wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null); }
if len(cards) > 10 {
log.Println(len(cards))
wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);
for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil) for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil)
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2500)
} }
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey) for href := range hrefs {
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey) sl := &intimate.StreamerList{}
time.Sleep(time.Millisecond * 2500) sl.Url = href
sl.UrlHash = intimate.GetUrlHash(sl.Url)
sl.Platform = string(intimate.Ptwitch)
sl.UpdateTime = intimate.GetUpdateTimeNow()
err := intimate.TStreamerList.Insert(sl)
if err != nil {
log.Println(err)
}
// TODO: Save href
// source := &intimate.Source{}
// source.Source = sql.NullString{String: href, Valid: true}
// source.Operator = 0
// source.Target = intimate.TTwitchChannel
// source.Url = weburl
// sstore.Insert(source)
}
log.Println("hrefs len:", len(hrefs))
// sstore.Deduplicate(intimate.TTwitchChannel, "source")
wd.Close()
wd.Quit()
time.Sleep(time.Minute * 30)
} }
for href := range hrefs {
// TODO: Save href
source := &intimate.Source{}
source.Source = sql.NullString{String: href, Valid: true}
source.Operator = 0
source.Target = intimate.TTwitchChannel
source.Url = weburl
sstore.Insert(source)
}
log.Println("hrefs len:", len(hrefs))
sstore.Deduplicate(intimate.TTwitchChannel, "source")
} }

View File

@ -1,6 +1,7 @@
package intimate package intimate
import ( import (
"crypto/md5"
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
@ -34,6 +35,10 @@ func GetUpdateTimeNow() *sql.NullTime {
return &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true} return &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true}
} }
func GetUrlHash(urlstr string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(urlstr)))
}
// ParseNumber 去逗号解析数字 // ParseNumber 去逗号解析数字
func ParseNumber(num string) (int64, error) { func ParseNumber(num string) (int64, error) {
num = strings.Trim(num, " ") num = strings.Trim(num, " ")