2020-07-07 10:39:24 +00:00
package intimate
2020-07-06 08:33:35 +00:00
import (
"database/sql"
2020-07-06 09:58:24 +00:00
"log"
2020-07-20 10:13:54 +00:00
"time"
2020-07-06 08:33:35 +00:00
_ "github.com/go-sql-driver/mysql"
)
2020-07-08 07:02:55 +00:00
// OperatorFlag 标志
type OperatorFlag int32
const (
2020-07-09 03:38:51 +00:00
// OperatorOK 等待被处理
OperatorOK OperatorFlag = 100
2020-07-16 08:22:14 +00:00
// OperatorExtractorOK 提取数据完成
OperatorExtractorOK OperatorFlag = 200
2020-07-08 07:02:55 +00:00
// OperatorWait 等待被处理
OperatorWait OperatorFlag = 1000
// OperatorError 错误标志
OperatorError OperatorFlag = 10000
)
2020-07-16 07:25:55 +00:00
type ISet interface {
Set ( string , interface { } )
}
2020-07-17 10:21:38 +00:00
type IGet interface {
Get ( string ) interface { }
}
type IGetSet interface {
ISet
IGet
}
2020-07-10 04:05:33 +00:00
// SourceStore 储存
2020-07-20 10:13:54 +00:00
type StoreSource struct {
2020-07-22 12:00:02 +00:00
table string
db * sql . DB
popCount int
2020-07-09 03:38:51 +00:00
errorCount int
errorLimit int
2020-07-06 08:33:35 +00:00
}
2020-07-22 12:00:02 +00:00
func ( store * StoreSource ) PopCount ( ) int {
return store . popCount
}
func ( store * StoreSource ) Close ( ) error {
return store . db . Close ( )
}
2020-07-10 04:05:33 +00:00
// NewSourceStore 创建一个存储实例
2020-07-20 10:13:54 +00:00
func NewStoreSource ( table string ) * StoreSource {
2020-07-10 04:05:33 +00:00
db , err := sql . Open ( "mysql" , InitConfig . Database . SourceURI )
2020-07-06 08:33:35 +00:00
if err != nil {
panic ( err )
}
2020-07-20 10:13:54 +00:00
return & StoreSource { table : table , db : db }
2020-07-06 08:33:35 +00:00
}
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) errorAlarm ( err error ) {
2020-07-06 09:58:24 +00:00
if err != nil {
2020-07-09 03:38:51 +00:00
log . Println ( "store error: " , err )
// 报警. 如果数据插入有问题
store . errorCount ++
if store . errorCount >= store . errorLimit {
// 数据库频繁操作初问题 报警, 减少没意义的请求
}
} else {
if store . errorCount > 0 {
store . errorCount --
}
2020-07-06 09:58:24 +00:00
}
2020-07-06 08:33:35 +00:00
}
2020-07-08 07:02:55 +00:00
2020-07-10 04:05:33 +00:00
// Insert 插入数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Insert ( isource IGet ) {
2020-07-23 10:29:56 +00:00
_ , err := store . db . Exec ( "insert into " + store . table + "(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)" , isource . Get ( "Url" ) , isource . Get ( "Target" ) , isource . Get ( "Source" ) , isource . Get ( "Ext" ) , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "StreamerId" ) )
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-09 03:38:51 +00:00
}
2020-07-24 10:48:33 +00:00
// Deduplicate 去重
func ( store * StoreSource ) Deduplicate ( target Target , field string ) {
2020-07-26 16:35:41 +00:00
sql := ` DELETE FROM ` + store . table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store . table + ` force index(target_type_idx) WHERE target_type = " ` + string ( target ) + ` " ) s GROUP BY s. ` + string ( field ) + ` ) ; `
_ , err := store . db . Exec ( sql )
2020-07-24 10:48:33 +00:00
if err != nil {
panic ( err )
}
}
2020-07-10 04:05:33 +00:00
// Update 更新数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Update ( isource IGet ) {
2020-07-17 10:21:38 +00:00
_ , err := store . db . Exec ( "update " + store . table + " set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Ext" ) , isource . Get ( "PassGob" ) , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-09 03:38:51 +00:00
}
2020-07-16 08:22:14 +00:00
// UpdateOperator 更新数据操作标志位
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) UpdateOperator ( isource IGet ) {
2020-07-17 10:21:38 +00:00
_ , err := store . db . Exec ( "update " + store . table + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-16 08:22:14 +00:00
}
2020-07-13 10:10:48 +00:00
// UpdateError 更新错误数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) UpdateError ( isource IGetSet , err error ) {
isource . Set ( "Operator" , int32 ( OperatorError ) + isource . Get ( "Operator" ) . ( int32 ) )
2020-07-17 10:21:38 +00:00
isource . Set ( "ErrorMsg" , sql . NullString { String : err . Error ( ) , Valid : true } )
_ , dberr := store . db . Exec ( "update " + store . table + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if dberr != nil {
2020-07-20 10:13:54 +00:00
// email tell owner to deal with
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-13 10:10:48 +00:00
}
2020-07-10 04:05:33 +00:00
// Restore 恢复Operator数据状态
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Restore ( isource IGet ) {
_ , dberr := store . db . Exec ( "update " + store . table + " set operator = ? where uid = ?" , isource . Get ( "LastOperator" ) , isource . Get ( "Uid" ) )
if dberr != nil {
// email tell owner to deal with
panic ( dberr )
2020-07-17 10:21:38 +00:00
}
2020-07-10 04:05:33 +00:00
}
// Pop 弹出一条未处理的数据
2020-07-23 10:29:56 +00:00
func ( store * StoreSource ) Pop ( targetType Target , operators ... int32 ) ( * Source , error ) {
2020-07-08 07:02:55 +00:00
tx , err := store . db . Begin ( )
if err != nil {
return nil , err
}
2020-07-23 10:29:56 +00:00
var args = [ ] interface { } { string ( targetType ) }
2020-07-21 07:05:56 +00:00
selectSQL := ` select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store . table + ` where target_type = ? `
2020-07-08 07:02:55 +00:00
if len ( operators ) == 0 {
selectSQL += " and operator = ?"
args = append ( args , 0 )
} else {
for _ , operator := range operators {
selectSQL += " and operator = ?"
args = append ( args , operator )
}
}
2020-07-08 10:57:57 +00:00
// log.Println(selectSQL + ` limit 1 for update`)
row := tx . QueryRow ( selectSQL + ` limit 1 for update ` , args ... )
2020-07-08 07:02:55 +00:00
defer func ( ) {
err := tx . Commit ( )
if err != nil {
log . Println ( err )
err = tx . Rollback ( )
if err != nil {
log . Println ( err )
}
}
2020-07-22 12:00:02 +00:00
store . popCount ++
2020-07-08 07:02:55 +00:00
} ( )
2020-07-16 03:02:30 +00:00
s := & Source { }
// uid, url, target_type, source, ext, operator
2020-07-23 10:29:56 +00:00
err = row . Scan ( & s . Uid , & s . Url , & s . Target , & s . Source , & s . Ext , & s . Operator , & s . UpdateTime , & s . StreamerId )
2020-07-16 03:02:30 +00:00
if err != nil {
return nil , err
2020-07-08 07:02:55 +00:00
}
2020-07-22 12:00:02 +00:00
2020-07-17 10:21:38 +00:00
s . Set ( "LastOperator" , s . Operator )
2020-07-16 03:02:30 +00:00
_ , err = tx . Exec ( "update " + store . table + " set operator = ? where uid = ?" , OperatorWait , s . Uid )
return s , nil
2020-07-08 07:02:55 +00:00
}
2020-07-10 04:05:33 +00:00
2020-07-17 10:21:38 +00:00
// StreamerTable 主播表名称
const StreamerTable string = "streamer"
2020-07-10 08:13:08 +00:00
// CollectLogTable 采集日志表
const CollectLogTable string = "collect_log"
2020-07-20 10:13:54 +00:00
type StoreExtractor struct {
2020-07-10 08:13:08 +00:00
db * sql . DB
2020-07-22 12:00:02 +00:00
popCount int
2020-07-10 08:13:08 +00:00
errorCount int
errorLimit int
}
2020-07-22 12:00:02 +00:00
func ( store * StoreExtractor ) PopCount ( ) int {
return store . popCount
}
func ( store * StoreExtractor ) Close ( ) error {
return store . db . Close ( )
}
2020-07-20 10:13:54 +00:00
func ( store * StoreExtractor ) errorAlarm ( err error ) {
2020-07-10 04:05:33 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
log . Println ( "store error: " , err )
2020-07-10 08:13:08 +00:00
// 报警. 如果数据插入有问题
store . errorCount ++
if store . errorCount >= store . errorLimit {
// 数据库频繁操作初问题 报警, 减少没意义的请求
}
} else {
if store . errorCount > 0 {
store . errorCount --
}
2020-07-10 04:05:33 +00:00
}
2020-07-10 08:13:08 +00:00
}
2020-07-20 10:13:54 +00:00
// NewStoreExtractor 生成一个extractor库的相关链接
func NewStoreExtractor ( ) * StoreExtractor {
2020-07-10 10:31:17 +00:00
db , err := sql . Open ( "mysql" , InitConfig . Database . ExtractorURI )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-10 10:31:17 +00:00
}
2020-07-20 10:13:54 +00:00
return & StoreExtractor { db : db }
2020-07-10 10:31:17 +00:00
}
2020-07-20 10:13:54 +00:00
// Pop 弹出一条未处理的数据
2020-07-23 10:29:56 +00:00
func ( store * StoreExtractor ) Pop ( platform Platform , operators ... int32 ) ( * Streamer , error ) {
2020-07-20 10:13:54 +00:00
tx , err := store . db . Begin ( )
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
return nil , err
2020-07-17 10:21:38 +00:00
}
2020-07-23 10:29:56 +00:00
var args = [ ] interface { } { string ( platform ) }
2020-07-21 07:05:56 +00:00
selectSQL := ` select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval `
2020-07-20 10:13:54 +00:00
if len ( operators ) == 0 {
selectSQL += " and operator = ?"
args = append ( args , 0 )
} else {
for _ , operator := range operators {
selectSQL += " and operator = ?"
args = append ( args , operator )
}
}
defer func ( ) {
err := tx . Commit ( )
if err != nil {
log . Println ( err )
err = tx . Rollback ( )
if err != nil {
log . Println ( err )
}
}
2020-07-22 12:00:02 +00:00
store . popCount ++
2020-07-20 10:13:54 +00:00
} ( )
// log.Println(selectSQL + ` limit 1 for update`)
row := tx . QueryRow ( selectSQL + ` limit 1 for update ` , args ... )
s := & Streamer { }
// uid, url, target_type, source, ext, operator
2020-07-21 07:05:56 +00:00
err = row . Scan ( & s . Uid , & s . UpdateTime , & s . UserId , & s . UpdateUrl , & s . IsUpdateStreamer )
2020-07-20 10:13:54 +00:00
if err != nil {
return nil , err
}
s . Set ( "LastOperator" , s . Operator )
_ , err = tx . Exec ( "update " + StreamerTable + " set operator = ? where uid = ?" , OperatorWait , s . Uid )
return s , nil
2020-07-17 10:21:38 +00:00
}
// InsertStreamer Streamer表, 插入数据
2020-07-20 10:13:54 +00:00
func ( store * StoreExtractor ) InsertStreamer ( streamer IGet ) ( isExists bool ) {
2020-07-17 11:20:08 +00:00
// select uid from table where platform = ? and user_id = ?
2020-07-20 10:13:54 +00:00
selectSQL := "SELECT is_update_url FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?"
2020-07-15 10:22:40 +00:00
tx , err := store . db . Begin ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-15 10:22:40 +00:00
}
2020-07-16 10:31:13 +00:00
2020-07-17 11:20:08 +00:00
defer func ( ) {
err = tx . Commit ( )
if err != nil {
2020-07-20 10:13:54 +00:00
rerr := tx . Rollback ( )
if rerr != nil {
log . Println ( rerr )
2020-07-17 11:20:08 +00:00
}
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 11:20:08 +00:00
}
} ( )
2020-07-20 10:13:54 +00:00
row := tx . QueryRow ( selectSQL + ` LIMIT 1 FOR UPDATE ` , streamer . Get ( "Platform" ) , streamer . Get ( "UserId" ) )
2020-07-21 07:05:56 +00:00
var isUpdateUrl bool
2020-07-20 10:13:54 +00:00
if err = row . Scan ( & isUpdateUrl ) ; err == nil {
2020-07-21 07:05:56 +00:00
if isUpdateUrl {
2020-07-20 10:13:54 +00:00
tx . Exec ( "UPDATE " + StreamerTable + " SET update_url = ?" , streamer . Get ( "UpdateUrl" ) )
}
return true
2020-07-15 10:22:40 +00:00
}
2020-07-20 10:13:54 +00:00
_ , err = tx . Exec ( "INSERT INTO " + StreamerTable + "(platform, user_id, update_url, update_time) VALUES(?,?,?,?);" , streamer . Get ( "Platform" ) , streamer . Get ( "UserId" ) , streamer . Get ( "UpdateUrl" ) , time . Now ( ) . Add ( - time . Minute * 30 ) )
if err != nil {
panic ( err )
}
return false
}
2020-07-15 10:22:40 +00:00
2020-07-20 10:13:54 +00:00
// UpdateError 更新错误数据
func ( store * StoreExtractor ) UpdateError ( isource IGetSet , err error ) {
isource . Set ( "Operator" , int32 ( OperatorError ) + isource . Get ( "Operator" ) . ( int32 ) )
isource . Set ( "ErrorMsg" , sql . NullString { String : err . Error ( ) , Valid : true } )
_ , dberr := store . db . Exec ( "update " + StreamerTable + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if dberr != nil {
// email tell owner to deal with
panic ( err )
}
}
// UpdateStreamerLog 只更新Streamer的关联日志和时间戳
func ( store * StoreExtractor ) UpdateStreamerLog ( latestUid int64 , streamerUid int64 ) {
_ , err := store . db . Exec ( "UPDATE " + StreamerTable + " SET latest_log_uid = ?, update_time = CURRENT_TIMESTAMP() WHERE uid = ?" , latestUid , streamerUid )
2020-07-15 10:22:40 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-15 10:22:40 +00:00
}
2020-07-20 10:13:54 +00:00
}
2020-07-15 10:22:40 +00:00
2020-07-20 10:13:54 +00:00
// UpdateOperator Streamer表, 插入数据
func ( store * StoreExtractor ) UpdateOperator ( isource IGet ) {
_ , err := store . db . Exec ( "update " + StreamerTable + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
panic ( err )
}
}
// UpdateStreamer Streamer表, 插入数据
func ( store * StoreExtractor ) UpdateStreamer ( isource IGet ) {
2020-07-21 07:05:56 +00:00
_ , err := store . db . Exec ( "UPDATE " + StreamerTable + " SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?, operator = ?, update_time = ? WHERE uid = ?;" ,
isource . Get ( "UserName" ) , isource . Get ( "LiveUrl" ) , isource . Get ( "Channel" ) , isource . Get ( "LatestLogUid" ) , isource . Get ( "Ext" ) , isource . Get ( "Operator" ) , isource . Get ( "UpdateTime" ) , isource . Get ( "Uid" ) )
2020-07-20 10:13:54 +00:00
if err != nil {
panic ( err )
}
2020-07-10 08:13:08 +00:00
}
// InsertCollectLog CollectLog表插入数据
2020-07-21 07:05:56 +00:00
func ( store * StoreExtractor ) InsertCollectLog ( isource IGet ) int64 {
2020-07-17 10:21:38 +00:00
tx , err := store . db . Begin ( )
defer func ( ) {
if err := recover ( ) ; err != nil {
2020-07-21 07:05:56 +00:00
tx . Rollback ( )
log . Panic ( err )
2020-07-17 10:21:38 +00:00
}
} ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
result , err := tx . Exec ( "insert into " + CollectLogTable + "(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" ,
isource . Get ( "StreamerUid" ) , isource . Get ( "Platform" ) , isource . Get ( "UserId" ) , isource . Get ( "IsLiveStreaming" ) , isource . Get ( "IsError" ) , isource . Get ( "Followers" ) , isource . Get ( "Views" ) , isource . Get ( "Giver" ) , isource . Get ( "Gratuity" ) , isource . Get ( "LiveTitle" ) , isource . Get ( "LiveStartTime" ) , isource . Get ( "LiveEndTime" ) , isource . Get ( "UpdateTime" ) , isource . Get ( "Tags" ) , isource . Get ( "Ext" ) , isource . Get ( "ErrorMsg" ) ,
2020-07-10 10:31:17 +00:00
)
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
logUid , err := result . LastInsertId ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
_ , err = tx . Exec ( "update " + StreamerTable + " set latest_log_uid = ? where uid = ?" , logUid , isource . Get ( "StreamerUid" ) )
if err = tx . Commit ( ) ; err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-21 07:05:56 +00:00
return logUid
2020-07-10 04:05:33 +00:00
}