fix: ParseDuration Method
add: store UpdateOperator
This commit is contained in:
parent
7e3b36c7d0
commit
fe25921f9a
1
extractor/openrec/.gitignore
vendored
1
extractor/openrec/.gitignore
vendored
|
@ -1,2 +1,3 @@
|
|||
*.html
|
||||
screenlog.*
|
||||
openrec
|
|
@ -11,5 +11,6 @@ package main
|
|||
*/
|
||||
|
||||
func main() {
|
||||
|
||||
oe := &OpenrecExtractor{}
|
||||
oe.Execute()
|
||||
}
|
||||
|
|
|
@ -136,7 +136,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) {
|
|||
|
||||
func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) {
|
||||
var tags []string
|
||||
matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})</a`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1)
|
||||
matheslist := regexp.MustCompile(`<a.+TagButton__Button[^>]+>(.{1,100})</a>`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1)
|
||||
for _, m := range matheslist {
|
||||
tags = append(tags, m[1])
|
||||
}
|
||||
|
@ -150,16 +150,19 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) {
|
|||
}
|
||||
|
||||
func (oe *OpenrecExtractor) Execute() {
|
||||
|
||||
collect := intimate.NewExtractorStore()
|
||||
store := intimate.NewSourceStore("source_openrec")
|
||||
|
||||
for {
|
||||
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorError))
|
||||
anchorId := source.GetSource().String
|
||||
|
||||
ai := &intimate.AnchorInfo{}
|
||||
|
@ -167,12 +170,10 @@ func (oe *OpenrecExtractor) Execute() {
|
|||
ai.SetPlatform(string(intimate.Popenrec))
|
||||
|
||||
sdata := source.GetExt().([]byte)
|
||||
|
||||
if gjson.ValidBytes(sdata) {
|
||||
result := gjson.ParseBytes(sdata)
|
||||
datamap := result.Map()
|
||||
|
||||
oe := &OpenrecExtractor{}
|
||||
oe.user = intimate.NewExtractorSource(datamap["user"])
|
||||
oe.user.CreateExtractor()
|
||||
|
||||
|
@ -198,6 +199,8 @@ func (oe *OpenrecExtractor) Execute() {
|
|||
Uid, err := collect.InsertAnchorInfo(ai)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -206,7 +209,14 @@ func (oe *OpenrecExtractor) Execute() {
|
|||
clog.Set("AnchorId", anchorId)
|
||||
clog.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
collect.InsertCollectLog(clog)
|
||||
if err = collect.InsertCollectLog(clog); err != nil {
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorExtractorOK))
|
||||
store.UpdateOperator(source)
|
||||
} else {
|
||||
log.Println("data is not json:\n", string(sdata))
|
||||
}
|
||||
|
|
|
@ -100,11 +100,13 @@ func TestExtractor(t *testing.T) {
|
|||
|
||||
for {
|
||||
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorError))
|
||||
anchorId := source.GetSource().String
|
||||
|
||||
ai := &intimate.AnchorInfo{}
|
||||
|
@ -112,7 +114,6 @@ func TestExtractor(t *testing.T) {
|
|||
ai.SetPlatform(string(intimate.Popenrec))
|
||||
|
||||
sdata := source.GetExt().([]byte)
|
||||
|
||||
if gjson.ValidBytes(sdata) {
|
||||
result := gjson.ParseBytes(sdata)
|
||||
datamap := result.Map()
|
||||
|
@ -143,6 +144,8 @@ func TestExtractor(t *testing.T) {
|
|||
Uid, err := collect.InsertAnchorInfo(ai)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -151,7 +154,14 @@ func TestExtractor(t *testing.T) {
|
|||
clog.Set("AnchorId", anchorId)
|
||||
clog.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
collect.InsertCollectLog(clog)
|
||||
if err = collect.InsertCollectLog(clog); err != nil {
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorExtractorOK))
|
||||
store.UpdateOperator(source)
|
||||
} else {
|
||||
t.Error("data is not json:\n", string(sdata))
|
||||
}
|
||||
|
|
8
store.go
8
store.go
|
@ -13,6 +13,8 @@ type OperatorFlag int32
|
|||
const (
|
||||
// OperatorOK 等待被处理
|
||||
OperatorOK OperatorFlag = 100
|
||||
// OperatorExtractorOK 提取数据完成
|
||||
OperatorExtractorOK OperatorFlag = 200
|
||||
// OperatorWait 等待被处理
|
||||
OperatorWait OperatorFlag = 1000
|
||||
// OperatorError 错误标志
|
||||
|
@ -67,6 +69,12 @@ func (store *SourceStore) Update(isource IUpdateSource) {
|
|||
store.errorAlarm(err)
|
||||
}
|
||||
|
||||
// UpdateOperator 更新数据操作标志位
|
||||
func (store *SourceStore) UpdateOperator(isource IUpdateSource) {
|
||||
_, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid())
|
||||
store.errorAlarm(err)
|
||||
}
|
||||
|
||||
// UpdateError 更新错误数据
|
||||
func (store *SourceStore) UpdateError(isource IUpdateSource, err error) {
|
||||
isource.SetOperator(int32(OperatorError))
|
||||
|
|
18
utils.go
18
utils.go
|
@ -19,7 +19,23 @@ func init() {
|
|||
|
||||
// ParseDuration time to duration eg: 1:40:00 -> time.Duration
|
||||
func ParseDuration(dt string) (time.Duration, error) {
|
||||
tdt, err := time.Parse("15:04:05", dt)
|
||||
|
||||
var parse []byte = []byte("00:00:00")
|
||||
|
||||
j := len(parse) - 1
|
||||
for i := len(dt) - 1; i >= 0; i-- {
|
||||
c := dt[i]
|
||||
if c != ':' {
|
||||
parse[j] = dt[i]
|
||||
} else {
|
||||
for parse[j] != ':' {
|
||||
j--
|
||||
}
|
||||
}
|
||||
j--
|
||||
}
|
||||
|
||||
tdt, err := time.Parse("15:04:05", string(parse))
|
||||
if err != nil {
|
||||
|
||||
return time.Duration(0), err
|
||||
|
|
Loading…
Reference in New Issue
Block a user