diff --git a/collect.go b/collect.go index 5b35971..5e1f7e2 100644 --- a/collect.go +++ b/collect.go @@ -7,16 +7,17 @@ import ( "time" "git.nonolive.co/eson.hsm/databoard-collect/database" + "github.com/go-sql-driver/mysql" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 func collectCopyCountLiveAnchors(cxt *WorkerContext) { - defer cxt.Done() var err error var ok bool + var lastLiveAnchor *CountLiveAnchors for !ps.IsClose() { liveanchor := &CountLiveAnchors{} @@ -32,8 +33,17 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) { } cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) } else { + + if lastLiveAnchor != nil { + if liveanchor.UID == liveanchor.UID { + ps.Wait(time.Second * 2) + continue + } + } + last = liveanchor.CreateAt - log.Println("last: ", last) + log.Println("last: ", last, liveanchor.UID) + lastLiveAnchor = liveanchor cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) } @@ -43,8 +53,9 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) { continue } + var isNoData bool for cur.Next(context.TODO()) && !ps.IsClose() { - // la := &database.LiveAnchorsCountPoint{} + la := &database.LiveAnchorsCountPointObjectID{} err = cur.Decode(la) if err != nil { @@ -63,10 +74,18 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) { c.CountMap = string(data) _, err = db.T.CountLiveAnchors.Insert(c) if err != nil { - log.Println(err) - ps.Wait(time.Second * 2) + switch err.(*mysql.MySQLError).Number { + case 1062: // duplicate + isNoData = true + default: + log.Println(err.(*mysql.MySQLError).Number, err) + } } } + + if isNoData { + ps.Wait(time.Second * 2) + } } } } diff --git a/go.mod b/go.mod index d031f49..0e51b4a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,5 @@ require ( github.com/474420502/perfectshutdown v0.1.0 github.com/go-sql-driver/mysql v1.5.0 github.com/go-xorm/xorm v0.7.9 - github.com/google/uuid v1.1.2 go.mongodb.org/mongo-driver v1.4.3 ) diff --git a/worker.go b/worker.go index d3f24e2..6dbdc0a 100644 --- a/worker.go +++ b/worker.go @@ -10,11 +10,6 @@ type WorkerContext struct { wg *sync.WaitGroup } -// Done 必须在Handler里结束时处理. 默认defer cxt.Done() -func (cxt *WorkerContext) Done() { - cxt.wg.Done() -} - // Worker 主进程 type Worker struct { cxt *WorkerContext @@ -30,7 +25,10 @@ var worker = func() *Worker { // Handler 处理方法 func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) { w.cxt.wg.Add(1) - go handleFunc(w.cxt) + go func() { + defer w.cxt.wg.Done() + handleFunc(w.cxt) + }() } // Run 运行