package intimate import ( "database/sql" "encoding/json" "fmt" "log" "reflect" "testing" "time" "github.com/davecgh/go-spew/spew" ) func (queue *Queue) Pop() (result interface{}, err error) { db := queue.table.store.db tx, err := db.Begin() if err != nil { return nil, err } defer func() { cerr := tx.Commit() if cerr != nil { log.Println(cerr) log.Println(tx.Rollback()) } }() selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" rows, err := tx.Query(selectsql) // err = rows.Err() if err != nil { return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) } var fields = make([]interface{}, queue.obj.NumField()) for i := range fields { var iv interface{} fields[i] = &iv } // if !rows.Next() { // return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) // } if rows.Next() { err = rows.Scan(fields...) if err != nil { return nil, err } } columntypes, err := rows.ColumnTypes() if err != nil { return nil, err } if err = rows.Close(); err != nil { return nil, err } _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) if err != nil { log.Println(err) return nil, err } obj := reflect.New(queue.obj).Elem() for i := 0; i < obj.NumField(); i++ { field := obj.Field(i) convert(*fields[i].(*interface{}), field, columntypes[i]) } return obj.Addr().Interface(), err } func TestAutoStore(t *testing.T) { uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" store := NewStore(uri) queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0") re, _ := queue.Pop() pstreamer := re.(*TSreamer) m := make(map[string]interface{}) json.Unmarshal(pstreamer.Iface.([]byte), &m) spew.Println(re.(*TSreamer), m) streamer := &TSreamer{} streamer.Uid = 2 streamer.UserID = &sql.NullString{String: "hehe", Valid: true} streamer.Name = "streamer" streamer.Operator = 0 streamer.Bit = 0b11 // streamer.Ext = &sql.NullString{String: "ext", Valid: true} tag := make(map[string]interface{}) tag["json"] = true tag["name"] = "test" btag, err := json.Marshal(tag) if err != nil { t.Error(err) } streamer.Iface = btag now := time.Now() streamer.UpdateTime = &now err = store.Table("streamer").Insert(streamer) if err != nil { t.Error(err) } } type TSreamer struct { Uid int `field:"uid" uid:"auto"` Name interface{} `field:"name"` UserID *sql.NullString `field:"userid"` Ext *sql.NullString `field:"ext"` Iface interface{} `field:"tag"` Bit uint64 `field:"bit"` Operator int `field:"operator"` UpdateTime *time.Time `field:"update_time"` }