diff --git a/config.go b/config.go index 4b6ddb1..9007be3 100644 --- a/config.go +++ b/config.go @@ -3,6 +3,7 @@ package intimate import ( "errors" "io/ioutil" + "log" "os" "gopkg.in/yaml.v2" @@ -20,20 +21,26 @@ func init() { // Config 配置 type Config struct { Database struct { - URI string `yaml:"uri"` // "user:password@/dbname" + SourceURI string `yaml:"source_uri"` // "user:password@/dbname" + ExtractorURI string `yaml:"extractor_uri"` } `yaml:"database"` } // Load 加载yaml/yml配置 func (conifg *Config) Load() { - configfile := "./config.yaml" - if _, err := os.Stat(configfile); os.IsNotExist(err) { - configfile = "./config.yml" - if _, err := os.Stat(configfile); os.IsNotExist(err) { - panic(errors.New("config.yaml or config.yml is not exists")) + var configfile string + configlist := []string{"./config.yaml", "./config.yml", "../../config.yml", "../../config.yaml"} + for _, configfile = range configlist { + if _, err := os.Stat(configfile); err == nil { + log.Println("find config: ", configfile) + break } } + if len(configfile) <= 4 { + log.Panic(errors.New("can't find config.yaml/config.yml")) + } + f, err := os.Open(configfile) if err != nil { panic(err) diff --git a/config.yaml b/config.yaml index 43d8752..e55d371 100644 --- a/config.yaml +++ b/config.yaml @@ -1,2 +1,3 @@ database: - uri: "root:@tcp(127.0.0.1:4000)/intimate_source" \ No newline at end of file + source_uri: "root:@tcp(127.0.0.1:4000)/intimate_source" + extractor_uri: "root:@tcp(127.0.0.1:4000)/intimate_extractor" \ No newline at end of file diff --git a/config_test.go b/config_test.go index aaffef3..3ef714e 100644 --- a/config_test.go +++ b/config_test.go @@ -6,7 +6,7 @@ func TestConfig(t *testing.T) { config := &Config{} config.Load() - if config.Database.URI != "root:@tcp(127.0.0.1:4000)/intimate_source" { + if config.Database.SourceURI != "root:@tcp(127.0.0.1:4000)/intimate_source" { t.Error("error yaml loaded, ", config) } } diff --git a/extractor/openrec/main.go b/extractor/openrec/main.go new file mode 100644 index 0000000..7905807 --- /dev/null +++ b/extractor/openrec/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + +} diff --git a/extractor/openrec/openrec_test.go b/extractor/openrec/openrec_test.go new file mode 100644 index 0000000..f930b91 --- /dev/null +++ b/extractor/openrec/openrec_test.go @@ -0,0 +1,37 @@ +package main + +import ( + "intimate" + "os" + "testing" + + "github.com/tidwall/gjson" +) + +func TestExtractor(t *testing.T) { + store := intimate.NewSourceStore("source_openrec") + source, err := store.Pop("openrec_user", 100) + if source != nil { + defer store.Restore(source) + } + if err != nil { + t.Error(err) + } + sdata := source.GetExt().([]byte) + + if gjson.ValidBytes(sdata) { + result := gjson.ParseBytes(sdata) + m := result.Map() + for key := range m { + t.Error(key) + f, err := os.OpenFile("./openrec_"+key+".html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) + if err != nil { + panic(err) + } + f.WriteString(m[key].String()) + } + } else { + t.Error("data is not json:\n", string(sdata)) + } + +} diff --git a/source.go b/source.go index 2939c22..f4aaf1e 100644 --- a/source.go +++ b/source.go @@ -15,6 +15,18 @@ type Source struct { UpdateTime time.Time // Operator int32 // ErrorMsg sql.NullString // + + lastOperator int32 +} + +// GetLastOperator Get return lastOperator int32 +func (so *Source) GetLastOperator() int32 { + return so.lastOperator +} + +// SetLastOperator Set lastOperator int32 +func (so *Source) SetLastOperator(lastOperator int32) { + so.lastOperator = lastOperator } // GetErrorMsg Get return ErrorMsg sql.NullString diff --git a/store.go b/store.go index cab54e6..268a43d 100644 --- a/store.go +++ b/store.go @@ -24,6 +24,8 @@ type IGetSource interface { type IUpdateSource interface { IGetSource + GetLastOperator() int32 + SetExt(ext interface{}) // SetUpdateTime(ut time.Time) // SetOperator(operator int32) // @@ -42,24 +44,24 @@ const ( OperatorError OperatorFlag = 10000 ) -// Store 储存 -type Store struct { +// SourceStore 储存 +type SourceStore struct { table string db *sql.DB errorCount int errorLimit int } -// NewStore 创建一个存储实例 -func NewStore(table string) *Store { - db, err := sql.Open("mysql", InitConfig.Database.URI) +// NewSourceStore 创建一个存储实例 +func NewSourceStore(table string) *SourceStore { + db, err := sql.Open("mysql", InitConfig.Database.SourceURI) if err != nil { panic(err) } - return &Store{table: table, db: db} + return &SourceStore{table: table, db: db} } -func (store *Store) errorAlarm(err error) { +func (store *SourceStore) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) // 报警. 如果数据插入有问题 @@ -74,20 +76,26 @@ func (store *Store) errorAlarm(err error) { } } -// Insert 储存数据 -func (store *Store) Insert(isource IGetSource) { +// Insert 插入数据 +func (store *SourceStore) Insert(isource IGetSource) { _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) store.errorAlarm(err) } -// Update 储存数据 -func (store *Store) Update(isource IUpdateSource) { +// Update 更新数据 +func (store *SourceStore) Update(isource IUpdateSource) { _, err := store.db.Exec("update "+store.table+" set ext = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) store.errorAlarm(err) } -// Pop 储存数据 -func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, error) { +// Restore 恢复Operator数据状态 +func (store *SourceStore) Restore(isource IUpdateSource) { + _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.GetLastOperator(), isource.GetUid()) + store.errorAlarm(err) +} + +// Pop 弹出一条未处理的数据 +func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSource, error) { tx, err := store.db.Begin() if err != nil { @@ -122,8 +130,11 @@ func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, e if row != nil { s := &Source{} + // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator) + s.SetLastOperator(s.Operator) + if err != nil { log.Println(err, targetType) _, err = tx.Exec("update "+store.table+" set error_msg = ?, operator = ? where uid = ?", OperatorError, s.Uid) @@ -138,3 +149,12 @@ func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, e return nil, errors.New("TaskQueue is nil") } + +// NewExtractorStore 创建一个存储实例 +func NewExtractorStore(table string) *SourceStore { + db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) + if err != nil { + panic(err) + } + return &SourceStore{table: table, db: db} +} diff --git a/store_test.go b/store_test.go index 5ec7e00..e5264ce 100644 --- a/store_test.go +++ b/store_test.go @@ -26,8 +26,8 @@ func TestStoreInsertCase1(t *testing.T) { } func TestStorePopCase1(t *testing.T) { - store := NewStore("source_openrec") - source, err := store.Pop("openrec_ranking") + store := NewSourceStore("source_openrec") + source, err := store.Pop(string(TTOpenrecRanking)) if err != nil { t.Error(err) } diff --git a/table_list.go b/table_list.go new file mode 100644 index 0000000..2a9fa5b --- /dev/null +++ b/table_list.go @@ -0,0 +1,10 @@ +package intimate + +// SourceTable 源的table列表 +type SourceTable string + +const ( + // STOpenrec openrec源table名称 + STOpenrec SourceTable = "source_openrec" +) + diff --git a/target_type_list.go b/target_type_list.go new file mode 100644 index 0000000..fcd013c --- /dev/null +++ b/target_type_list.go @@ -0,0 +1,12 @@ +package intimate + +// TargetType 源的 目标类型 列表 +type TargetType string + +const ( + // TTOpenrecRanking openrec源TargetType名称 + TTOpenrecRanking TargetType = "openrec_ranking" + + // TTOpenrecUser openrec源TargetType名称 + TTOpenrecUser TargetType = "openrec_ranking" +) diff --git a/tasks/openrec/openrec_task1/config.yaml b/tasks/openrec/openrec_task1/config.yaml deleted file mode 120000 index e416226..0000000 --- a/tasks/openrec/openrec_task1/config.yaml +++ /dev/null @@ -1 +0,0 @@ -../../../config.yaml \ No newline at end of file diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 36cffba..caf8c81 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -11,12 +11,10 @@ import ( "github.com/tidwall/gjson" ) -var targetTypeRanking = "openrec_ranking" -var targetTypeUser = "openrec_user" var openrecRanking *OpenrecRanking // store 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var store *intimate.Store = intimate.NewStore("source_openrec") +var store *intimate.SourceStore = intimate.NewSourceStore(string(intimate.STOpenrec)) func init() { @@ -67,7 +65,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { data.SetSource(sql.NullString{String: userid, Valid: len(userid) > 0}) data.SetUrl(wf.GetRawURL()) - data.SetTargetType(targetTypeUser) + data.SetTargetType(string(intimate.TTOpenrecUser)) store.Insert(data) } } diff --git a/tasks/openrec/openrec_task2/config.yaml b/tasks/openrec/openrec_task2/config.yaml deleted file mode 100644 index 43d8752..0000000 --- a/tasks/openrec/openrec_task2/config.yaml +++ /dev/null @@ -1,2 +0,0 @@ -database: - uri: "root:@tcp(127.0.0.1:4000)/intimate_source" \ No newline at end of file diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index b74f62b..54931be 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -10,12 +10,10 @@ import ( "github.com/474420502/hunter" ) -var targetTypeUser = "openrec_user" -var targetTypeRanking = "openrec_ranking" var oer *OpenrecExtratorRanking // store 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var store *intimate.Store = intimate.NewStore("source_openrec") +var store *intimate.SourceStore = intimate.NewSourceStore(string(intimate.STOpenrec)) func init() { oer = &OpenrecExtratorRanking{} @@ -31,7 +29,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for { - source, err := store.Pop(targetTypeUser) + source, err := store.Pop(string(intimate.TTOpenrecUser)) if err != nil { log.Println(err) return diff --git a/testfile/openrec_user.html b/testfile/openrec_user.html new file mode 100755 index 0000000..6579f80 --- /dev/null +++ b/testfile/openrec_user.html @@ -0,0 +1,1659 @@ + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +ピューロ
+APEX LEGENDS
+FORTNITE
+VALORANT
+あつまれ どうぶつの森
++ Send
++ Send
+手越ちゃんねる
+ + +Subs
++ + Subs management > +
+ +Videos
+5
+Views
+732,990
+Followers
+64,978
+Supporter
+281
+ピューロ
+APEX LEGENDS
+FORTNITE
+VALORANT
+あつまれ どうぶつの森
++ Send
++ Send
+手越ちゃんねる
+ + +Subs
++ + Subs management > +
+ +Videos
+5
+Views
+732,990
+Followers
+64,978
+Supporter
+281
+💗みみ💗
++ @miyu_tego1111
++ 68,800 Yell
+かわさま
++ @kwsknzm1054
++ 40,000 Yell
+ゆかちゃん
++ @yytg316
++ 16,400 Yell
+🍿ジャム🍿
++ @tego10504
++ 16,000 Yell
+A R
++ @shion718911
++ 14,000 Yell
+らぁちゃん
++ @Rrchqn
++ 12,000 Yell
+みー
++ @a2388yuya
++ 11,200 Yell
+KAGUYA
++ @yuya11Miina
++ 10,400 Yell
+かなこ
++ @pen__56
++ 9,600 Yell
+かえでちゃんまん
++ @kaede__rmd
++ 9,440 Yell
+かにびーむ
++ @5nkr2
++ 9,200 Yell
+reiccc
++ @reiccc
++ 8,800 Yell
++
+