package main import ( "context" "encoding/json" "log" "time" "git.nonolive.co/eson.hsm/databoard-collect/database" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 func collectCopyCountLiveAnchors(cxt *WorkerContext) { defer cxt.Done() var err error var ok bool for !ps.IsClose() { liveanchor := &CountLiveAnchors{} ok, err = db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor) if ok || (ok == false && err == nil) { var cur *mongo.Cursor var last time.Time if ok == false { last, err = time.ParseInLocation("2006-01-02", "2020-05-30", time.Local) if err != nil { panic(err) } cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) } else { last = liveanchor.CreateAt log.Println("last: ", last) cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) } if err != nil { log.Println(err) ps.Wait(time.Second * 5) continue } for cur.Next(context.TODO()) { // la := &database.LiveAnchorsCountPoint{} la := &database.LiveAnchorsCountPointObjectID{} err = cur.Decode(la) if err != nil { panic(err) } c := &CountLiveAnchors{} c.UID = la.ObjectID.String() c.IsCounted = 0 c.CreateAt = la.CreateAt data, err := json.Marshal(la.LiveAnchorDict) if err != nil { panic(err) } c.CountMap = string(data) _, err = db.T.CountLiveAnchors.Insert(c) if err != nil { log.Println(err) ps.Wait(time.Second * 2) } } } } }