databoard-transform/collect.go
2020-12-11 14:40:31 +08:00

87 lines
1.8 KiB
Go

package main
import (
"context"
"encoding/json"
"log"
"time"
mongodb "git.nonolive.co/eson.hsm/databoard-collect/database"
myrocks "git.nonolive.co/eson.hsm/databoard-database-myrocks"
"github.com/go-sql-driver/mysql"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
// collectCopyCountLiveAnchors 从mongodb里复制需要增量的值
func collectCopyCountLiveAnchors(cxt *WorkerContext) {
var err error
var lastuid string
for !ps.IsClose() {
var isNoData bool
db.Do(func(T *myrocks.TableManager) {
liveanchor := &CountLiveAnchors{}
if err = T.CountLiveAnchors.Order("create_at desc").Limit(1).Find(liveanchor).Error; err == nil {
var cur *mongo.Cursor
var last time.Time
last = liveanchor.CreateAt
if lastuid == liveanchor.UID {
isNoData = true
return
}
log.Println("last: ", last, liveanchor.UID)
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}})
if err != nil {
log.Println(err)
ps.Wait(time.Second * 5)
return
}
for cur.Next(context.TODO()) && !ps.IsClose() {
la := &mongodb.LiveAnchorsCountPointObjectID{}
err = cur.Decode(la)
if err != nil {
panic(err)
}
uid := la.ObjectID.Hex()
c := &CountLiveAnchors{}
c.UID = uid
c.IsCounted = 0
c.CreateAt = la.CreateAt
data, err := json.Marshal(la.LiveAnchorDict)
if err != nil {
panic(err)
}
c.CountMap = string(data)
err = T.CountLiveAnchors.FirstOrCreate(c).Error
if err != nil {
switch err.(*mysql.MySQLError).Number {
case 1062: // duplicate
isNoData = true
default:
log.Println(err.(*mysql.MySQLError).Number, err)
}
} else {
lastuid = uid
}
}
}
})
if isNoData {
ps.Wait(time.Second * 2)
}
}
}