package main import ( "context" "encoding/json" "log" "time" "git.nonolive.co/eson.hsm/databoard-collect/database" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 func collectCopyCountLiveAnchors(cxt *WorkerContext) { defer cxt.Done() for !ps.IsClose() { liveanchor := &CountLiveAnchors{} ok, err := db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor) if ok || (ok == false && err == nil) { var cur *mongo.Cursor if ok == false { cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": "2020-05-30"}}) } else { last := liveanchor.CreateAt log.Println("last: ", last) cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) } if err != nil { log.Println(err) ps.Wait(time.Second * 5) continue } for cur.Next(context.TODO()) { la := &database.LiveAnchorsCountPoint{} err = cur.Decode(la) if err != nil { panic(err) } c := &CountLiveAnchors{} c.UID = uuid.New().String() c.IsCounted = 0 c.CreateAt = la.CreateAt data, err := json.Marshal(la.LiveAnchorDict) if err != nil { panic(err) } c.CountMap = string(data) _, err = db.T.CountLiveAnchors.Insert(c) if err != nil { log.Println(err) } } } } }