package intimate import ( "database/sql" "encoding/binary" "fmt" "log" "reflect" "strconv" "testing" ) type Store struct { db *sql.DB } type Table struct { store *Store name string setting interface{} updatesql string selectsql string insertsql string } // const updatesql = "UPDATE %s SET %s WHERE %s = ?" func NewStore(uri string) *Store { db, err := sql.Open("mysql", uri) if err != nil { panic(err) } s := &Store{db: db} return s } func (store *Store) Table(name string) *Table { table := &Table{store: store} table.name = name table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` // table.selectsql = `FROM ` + table.name + `WHERE operator` return table } type Queue struct { table *Table obj reflect.Type selected string condition string uidname string uididx int } type OperatorType string const ( OP_OK OperatorType = "0" OP_WAIT OperatorType = "1000" OP_ERROR OperatorType = "10000" ) func (queue *Queue) Pop() (result interface{}, err error) { db := queue.table.store.db tx, err := db.Begin() if err != nil { return nil, err } defer func() { cerr := tx.Commit() if cerr != nil { log.Println(cerr) log.Println(tx.Rollback()) } }() selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" rows, err := tx.Query(selectsql) // err = rows.Err() if err != nil { return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) } var fields = make([]interface{}, queue.obj.NumField()) for i := range fields { var iv interface{} fields[i] = &iv } // if !rows.Next() { // return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) // } if rows.Next() { err = rows.Scan(fields...) if err != nil { return nil, err } } columntypes, err := rows.ColumnTypes() if err != nil { return nil, err } if err = rows.Close(); err != nil { return nil, err } _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OP_WAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) if err != nil { log.Println(err) return nil, err } obj := reflect.New(queue.obj).Elem() for i := 0; i < obj.NumField(); i++ { field := obj.Field(i) convert(*fields[i].(*interface{}), field, columntypes[i]) // if field.Type().Kind() == reflect.Ptr { // field.Elem().Set(reflect.ValueOf(*fields[i].(*interface{}))) // continue // } // field.Set(reflect.ValueOf(*fields[i].(*interface{}))) } return obj.Interface(), err } func TestAutoStore(t *testing.T) { uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" store := NewStore(uri) queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0") t.Error(queue.Pop()) streamer := &TSreamer{} streamer.Uid = 2 streamer.UserID = &sql.NullString{String: "xixi", Valid: true} streamer.Name = "streamer" streamer.Operator = 0 streamer.Ext = &sql.NullString{String: "ext", Valid: true} err := store.Table("streamer").Insert(streamer) if err != nil { t.Error(err) } } func convert(dest interface{}, field reflect.Value, columntype *sql.ColumnType) error { log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field) if field.Kind() == reflect.Ptr { fn := field.Type().Elem().Name() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time field = field.Elem() // log.Println("type:", fn, ",kind:", field.Kind(), ",field:", field) } if field.Kind() == reflect.Interface { } // log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind()) switch fv := field.Kind(); fv { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: if dest == nil { return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) } log.Println(binary.Varint(dest.([]byte))) s := asString(dest) i64, err := strconv.ParseInt(s, 10, field.Type().Bits()) if err != nil { err = strconvErr(err) return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) } field.SetInt(i64) return nil case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: if dest == nil { return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) } s := asString(dest) u64, err := strconv.ParseUint(s, 10, field.Type().Bits()) if err != nil { err = strconvErr(err) return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) } field.SetUint(u64) return nil case reflect.Float32, reflect.Float64: if dest == nil { return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) } s := asString(dest) f64, err := strconv.ParseFloat(s, field.Type().Bits()) if err != nil { err = strconvErr(err) return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) } field.SetFloat(f64) return nil case reflect.String: if dest == nil { return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) } switch v := dest.(type) { case string: field.SetString(v) return nil case []byte: field.SetString(string(v)) return nil } default: // log.Println(fv, columntype.ScanType().Kind()) } return nil } func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { q := &Queue{} q.condition = whereCondition q.obj = reflect.TypeOf(obj) q.table = t for i := 0; i < q.obj.NumField(); i++ { field := q.obj.Field(i) if fname, ok := field.Tag.Lookup("field"); ok { q.selected += fname + "," if _, ok := field.Tag.Lookup("uid"); ok { q.uididx = i q.uidname = fname } } } q.selected = q.selected[:len(q.selected)-1] return q } func (t *Table) Insert(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" argssql := "" var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if flag, ok := ftype.Tag.Lookup("uid"); ok { if flag == "auto" { continue } } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + "," argssql += "?," } } else { args = append(args, field.Interface()) fieldsql += fname + "," argssql += "?," } } } ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1]) _, err := t.store.db.Exec(ssql, args...) return err } func (t *Table) Update(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" var uidname string var uidvalue interface{} var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if _, ok := ftype.Tag.Lookup("uid"); ok { if uidvalue != nil { panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname)) } uidname = fname uidvalue = field.Interface() continue } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + " = ?," } } else { args = append(args, field.Interface()) fieldsql += fname + " = ?," } } } if uidvalue == nil { panic(fmt.Errorf("update must contain `uid` tag")) } usql := fmt.Sprintf(t.updatesql, fieldsql[:len(fieldsql)-1], uidname) args = append(args, uidvalue) _, err := t.store.db.Exec(usql, args...) return err } type TSreamer struct { Uid int `field:"uid" uid:"auto"` Name interface{} `field:"name"` UserID *sql.NullString `field:"userid"` Ext *sql.NullString `field:"ext"` Iface interface{} `field:"tag"` Operator int `field:"operator"` }