package main import ( "container/list" "sync" "time" "github.com/474420502/focus/compare" pqueuekey "github.com/474420502/focus/priority_queuekey" ) type Task struct { Name string `json:"name"` // 任务名字 Type int64 `json:"type"` // 类型 Priority int64 `json:"priority"` // 优先级 默认0 CreateAt time.Time `json:"create_at"` // 创建任务时间 Info interface{} `json:"info"` // 任务的信息内容 } type Queue struct { lock sync.Mutex count uint64 waitChans *list.List queue *pqueuekey.PriorityQueue } func NewQueue() *Queue { return &Queue{ waitChans: list.New(), queue: pqueuekey.New(compare.UInt64), } } func (q *Queue) Push(task *Task) { q.lock.Lock() defer q.lock.Unlock() if q.waitChans.Len() != 0 { element := q.waitChans.Front() ctask := element.Value.(chan *Task) ctask <- task q.waitChans.Remove(element) return } q.count++ q.queue.Push(q.count, task) } func (q *Queue) Pop() (task *Task) { q.lock.Lock() defer q.lock.Unlock() q.count++ if itask, ok := q.queue.Pop(); ok { return itask.(*Task) } return nil } func (q *Queue) popBlock(task chan *Task) { q.lock.Lock() defer q.lock.Unlock() if q.queue.Size() != 0 { itask, _ := q.queue.Pop() go func() { task <- itask.(*Task) }() return } q.waitChans.PushBack(task) } func (q *Queue) PopBlock() *Task { task := make(chan *Task) q.popBlock(task) return (<-task) }