package intimate import ( "database/sql" "errors" "log" "time" _ "github.com/go-sql-driver/mysql" ) // IGetSource 源接口结构 type IGetSource interface { GetUrl() string // GetTargetType() string // GetSource() sql.NullString // GetExt() interface{} // GetUpdateTime() time.Time // GetOperator() int32 // GetErrorMsg() sql.NullString // } type IUpdateSource interface { IGetSource SetExt(ext interface{}) // SetUpdateTime(ut time.Time) // SetOperator(operator int32) // SetErrorMsg(emsg sql.NullString) // } // OperatorFlag 标志 type OperatorFlag int32 const ( // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 OperatorError OperatorFlag = 10000 ) // Store 储存 type Store struct { table string db *sql.DB } // NewStore 创建一个存储实例 func NewStore(table string) *Store { db, err := sql.Open("mysql", InitConfig.Database.URI) if err != nil { panic(err) } return &Store{table: table, db: db} } // Save 储存数据 func (store *Store) Save(isource IGetSource) { _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) if err != nil { log.Fatalln(err) } } // Pop 储存数据 func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, error) { tx, err := store.db.Begin() if err != nil { log.Println(err, targetType) return nil, err } var args = []interface{}{targetType} selectSQL := `select uid, url, target_type, source, ext, operator from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) } else { for _, operator := range operators { selectSQL += " and operator = ?" args = append(args, operator) } } log.Println(selectSQL + ` limit 1 for update`) row := tx.QueryRow(selectSQL, args...) defer func() { err := tx.Commit() if err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } } }() if row != nil { s := &Source{} // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator) if err != nil { log.Println(err, targetType) _, err = tx.Exec("update "+store.table+" set error_msg = ?, operator = ? where uid = ?", OperatorError, s.Uid) if err != nil { log.Println(err) } return nil, err } _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil } return nil, errors.New("TaskQueue is nil") }