diff --git a/extractor/openrec/.gitignore b/extractor/openrec/.gitignore index 589ccd4..f09112e 100644 --- a/extractor/openrec/.gitignore +++ b/extractor/openrec/.gitignore @@ -1,2 +1,3 @@ *.html -screenlog.* \ No newline at end of file +screenlog.* +openrec \ No newline at end of file diff --git a/extractor/openrec/main.go b/extractor/openrec/main.go index 86abea9..c0deaf0 100644 --- a/extractor/openrec/main.go +++ b/extractor/openrec/main.go @@ -11,5 +11,6 @@ package main */ func main() { - + oe := &OpenrecExtractor{} + oe.Execute() } diff --git a/extractor/openrec/openrec_extractor.go b/extractor/openrec/openrec_extractor.go index 7e846ba..de14fe8 100644 --- a/extractor/openrec/openrec_extractor.go +++ b/extractor/openrec/openrec_extractor.go @@ -136,7 +136,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) { func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) { var tags []string - matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})]+>(.{1,100})`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1) for _, m := range matheslist { tags = append(tags, m[1]) } @@ -150,16 +150,19 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) { } func (oe *OpenrecExtractor) Execute() { + collect := intimate.NewExtractorStore() store := intimate.NewSourceStore("source_openrec") for { source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) + if err != nil { log.Println(err) return } + source.SetOperator(int32(intimate.OperatorError)) anchorId := source.GetSource().String ai := &intimate.AnchorInfo{} @@ -167,12 +170,10 @@ func (oe *OpenrecExtractor) Execute() { ai.SetPlatform(string(intimate.Popenrec)) sdata := source.GetExt().([]byte) - if gjson.ValidBytes(sdata) { result := gjson.ParseBytes(sdata) datamap := result.Map() - oe := &OpenrecExtractor{} oe.user = intimate.NewExtractorSource(datamap["user"]) oe.user.CreateExtractor() @@ -198,6 +199,8 @@ func (oe *OpenrecExtractor) Execute() { Uid, err := collect.InsertAnchorInfo(ai) if err != nil { log.Println(err) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) return } @@ -206,7 +209,14 @@ func (oe *OpenrecExtractor) Execute() { clog.Set("AnchorId", anchorId) clog.Set("UpdateTime", source.GetUpdateTime()) - collect.InsertCollectLog(clog) + if err = collect.InsertCollectLog(clog); err != nil { + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) + return + } + + source.SetOperator(int32(intimate.OperatorExtractorOK)) + store.UpdateOperator(source) } else { log.Println("data is not json:\n", string(sdata)) } diff --git a/extractor/openrec/openrec_test.go b/extractor/openrec/openrec_test.go index 0480a65..8190607 100644 --- a/extractor/openrec/openrec_test.go +++ b/extractor/openrec/openrec_test.go @@ -100,11 +100,13 @@ func TestExtractor(t *testing.T) { for { source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) + if err != nil { log.Println(err) return } + source.SetOperator(int32(intimate.OperatorError)) anchorId := source.GetSource().String ai := &intimate.AnchorInfo{} @@ -112,7 +114,6 @@ func TestExtractor(t *testing.T) { ai.SetPlatform(string(intimate.Popenrec)) sdata := source.GetExt().([]byte) - if gjson.ValidBytes(sdata) { result := gjson.ParseBytes(sdata) datamap := result.Map() @@ -143,6 +144,8 @@ func TestExtractor(t *testing.T) { Uid, err := collect.InsertAnchorInfo(ai) if err != nil { t.Error(err) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) return } @@ -151,7 +154,14 @@ func TestExtractor(t *testing.T) { clog.Set("AnchorId", anchorId) clog.Set("UpdateTime", source.GetUpdateTime()) - collect.InsertCollectLog(clog) + if err = collect.InsertCollectLog(clog); err != nil { + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) + return + } + + source.SetOperator(int32(intimate.OperatorExtractorOK)) + store.UpdateOperator(source) } else { t.Error("data is not json:\n", string(sdata)) } diff --git a/store.go b/store.go index 9b96fdf..900975f 100644 --- a/store.go +++ b/store.go @@ -13,6 +13,8 @@ type OperatorFlag int32 const ( // OperatorOK 等待被处理 OperatorOK OperatorFlag = 100 + // OperatorExtractorOK 提取数据完成 + OperatorExtractorOK OperatorFlag = 200 // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 @@ -67,6 +69,12 @@ func (store *SourceStore) Update(isource IUpdateSource) { store.errorAlarm(err) } +// UpdateOperator 更新数据操作标志位 +func (store *SourceStore) UpdateOperator(isource IUpdateSource) { + _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) + store.errorAlarm(err) +} + // UpdateError 更新错误数据 func (store *SourceStore) UpdateError(isource IUpdateSource, err error) { isource.SetOperator(int32(OperatorError)) diff --git a/utils.go b/utils.go index f922bf5..9df8fbb 100644 --- a/utils.go +++ b/utils.go @@ -19,7 +19,23 @@ func init() { // ParseDuration time to duration eg: 1:40:00 -> time.Duration func ParseDuration(dt string) (time.Duration, error) { - tdt, err := time.Parse("15:04:05", dt) + + var parse []byte = []byte("00:00:00") + + j := len(parse) - 1 + for i := len(dt) - 1; i >= 0; i-- { + c := dt[i] + if c != ':' { + parse[j] = dt[i] + } else { + for parse[j] != ':' { + j-- + } + } + j-- + } + + tdt, err := time.Parse("15:04:05", string(parse)) if err != nil { return time.Duration(0), err