package main import ( "context" "database/sql" "encoding/json" "fmt" "log" "time" mongodb "git.nonolive.co/eson.hsm/databoard-collect/database" "github.com/go-sql-driver/mysql" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 func collectCopyCountLiveAnchors(cxt *WorkerContext) { var err error for !ps.IsClose() { db.Do(func(db *sql.DB) { liveanchor := &CountLiveAnchors{} // T.CountLiveAnchors.Order("create_at desc").Limit(1).Find(liveanchor) selectsql := fmt.Sprintf("select uid, create_at from %s order by create_at desc limit 1", Tables.CountLiveAnchors) row := db.QueryRow(selectsql) if row.Err() == nil { row.Scan(&liveanchor.UID, &liveanchor.CreateAt) log.Println("last: ", liveanchor.CreateAt, liveanchor.UID) var cur *mongo.Cursor cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": liveanchor.CreateAt}}) if err != nil { log.Println(err) ps.Wait(time.Second * 5) return } for cur.Next(context.TODO()) && !ps.IsClose() { la := &mongodb.LiveAnchorsCountPointObjectID{} err = cur.Decode(la) if err != nil { panic(err) } uid := la.ObjectID.Hex() c := &CountLiveAnchors{} c.UID = uid c.IsCounted = 0 c.CreateAt = la.CreateAt data, err := json.Marshal(la.LiveAnchorDict) if err != nil { panic(err) } c.CountMap = string(data) insertsql := fmt.Sprintf("insert ignore into %s(uid, is_counted, count_map, create_at) values(?,?,?,?)", Tables.CountLiveAnchors) _, err = db.Exec(insertsql, c.UID, c.IsCounted, c.CountMap, c.CreateAt) if err != nil { switch merr := err.(*mysql.MySQLError); merr.Number { case 1062: default: log.Println(merr) } } } } }) ps.Wait(time.Second * 5) } }