支持空表

This commit is contained in:
eson 2020-12-09 18:11:52 +08:00
parent 3e7812c305
commit 685f934fd7
2 changed files with 15 additions and 11 deletions

View File

@ -9,6 +9,7 @@ import (
"git.nonolive.co/eson.hsm/databoard-collect/database" "git.nonolive.co/eson.hsm/databoard-collect/database"
"github.com/google/uuid" "github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
) )
// collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值
@ -18,10 +19,17 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) {
for !ps.IsClose() { for !ps.IsClose() {
liveanchor := &CountLiveAnchors{} liveanchor := &CountLiveAnchors{}
if ok, err := db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor); ok { ok, err := db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor)
last := liveanchor.CreateAt if ok || (ok == false && err == nil) {
log.Println("last: ", last)
cur, err := mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) var cur *mongo.Cursor
if liveanchor == nil {
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": "2020-05-30"}})
} else {
last := liveanchor.CreateAt
log.Println("last: ", last)
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}})
}
if err != nil { if err != nil {
log.Println(err) log.Println(err)
ps.Wait(time.Second * 5) ps.Wait(time.Second * 5)
@ -50,9 +58,6 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) {
log.Println(err) log.Println(err)
} }
} }
} else {
log.Println(err)
ps.Wait(time.Second * 5)
} }
} }
} }

View File

@ -17,26 +17,25 @@ func (cxt *WorkerContext) Done() {
// Worker 主进程 // Worker 主进程
type Worker struct { type Worker struct {
wg *sync.WaitGroup
cxt *WorkerContext cxt *WorkerContext
} }
var worker = func() *Worker { var worker = func() *Worker {
w := &Worker{} w := &Worker{}
w.cxt = &WorkerContext{} w.cxt = &WorkerContext{}
w.wg = &sync.WaitGroup{} w.cxt.wg = &sync.WaitGroup{}
return w return w
}() }()
// Handler 处理方法 // Handler 处理方法
func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) { func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) {
w.wg.Add(1) w.cxt.wg.Add(1)
go handleFunc(w.cxt) go handleFunc(w.cxt)
} }
// Run 运行 // Run 运行
func (w *Worker) Run() { func (w *Worker) Run() {
log.Println("worker running") log.Println("worker running")
w.wg.Wait() w.cxt.wg.Wait()
log.Println("worker stop") log.Println("worker stop")
} }