hrtools/server/queue.go

82 lines
1.4 KiB
Go
Raw Permalink Normal View History

2021-04-28 06:11:24 +00:00
package main
import (
"container/list"
"sync"
"time"
2021-04-28 06:11:24 +00:00
"github.com/474420502/focus/compare"
pqueuekey "github.com/474420502/focus/priority_queuekey"
)
type Task struct {
Name string `json:"name"` // 任务名字
Type int64 `json:"type"` // 类型
Priority int64 `json:"priority"` // 优先级 默认0
CreateAt time.Time `json:"create_at"` // 创建任务时间
Info interface{} `json:"info"` // 任务的信息内容
2021-04-28 06:11:24 +00:00
}
type Queue struct {
lock sync.Mutex
count uint64
waitChans *list.List
queue *pqueuekey.PriorityQueue
}
func NewQueue() *Queue {
return &Queue{
waitChans: list.New(),
queue: pqueuekey.New(compare.UInt64),
}
}
func (q *Queue) Push(task *Task) {
q.lock.Lock()
defer q.lock.Unlock()
if q.waitChans.Len() != 0 {
element := q.waitChans.Front()
ctask := element.Value.(chan *Task)
ctask <- task
q.waitChans.Remove(element)
return
}
q.count++
q.queue.Push(q.count, task)
}
func (q *Queue) Pop() (task *Task) {
q.lock.Lock()
defer q.lock.Unlock()
q.count++
if itask, ok := q.queue.Pop(); ok {
return itask.(*Task)
}
return nil
}
func (q *Queue) popBlock(task chan *Task) {
q.lock.Lock()
defer q.lock.Unlock()
if q.queue.Size() != 0 {
itask, _ := q.queue.Pop()
go func() { task <- itask.(*Task) }()
return
}
q.waitChans.PushBack(task)
}
func (q *Queue) PopBlock() *Task {
task := make(chan *Task)
q.popBlock(task)
return (<-task)
}