feat(queue): add queue

This commit is contained in:
eson 2021-04-28 14:11:24 +08:00
parent 6d1b058de8
commit b70b8491ba
4 changed files with 147 additions and 32 deletions

View File

@ -1,12 +1,5 @@
package main
import (
"testing"
"github.com/474420502/focus/compare"
pqueuekey "github.com/474420502/focus/priority_queuekey"
)
func Benchmark123(b *testing.B) {
pqueuekey.New(compare.Int)
}
// func Benchmark123(b *testing.B) {
// pqueuekey.New(compare.Int)
// }

76
server/queue.go Normal file
View File

@ -0,0 +1,76 @@
package main
import (
"container/list"
"sync"
"github.com/474420502/focus/compare"
pqueuekey "github.com/474420502/focus/priority_queuekey"
)
type Task struct {
Value int
}
type Queue struct {
lock sync.Mutex
count uint64
waitChans *list.List
queue *pqueuekey.PriorityQueue
}
func NewQueue() *Queue {
return &Queue{
waitChans: list.New(),
queue: pqueuekey.New(compare.UInt64),
}
}
func (q *Queue) Push(task *Task) {
q.lock.Lock()
defer q.lock.Unlock()
if q.waitChans.Len() != 0 {
element := q.waitChans.Front()
ctask := element.Value.(chan *Task)
ctask <- task
q.waitChans.Remove(element)
return
}
q.count++
q.queue.Push(q.count, task)
}
func (q *Queue) Pop() (task *Task) {
q.lock.Lock()
defer q.lock.Unlock()
q.count++
if itask, ok := q.queue.Pop(); ok {
return itask.(*Task)
}
return nil
}
func (q *Queue) popBlock(task chan *Task) {
q.lock.Lock()
defer q.lock.Unlock()
if q.queue.Size() != 0 {
itask, _ := q.queue.Pop()
go func() { task <- itask.(*Task) }()
return
}
q.waitChans.PushBack(task)
}
func (q *Queue) PopBlock() *Task {
task := make(chan *Task)
q.popBlock(task)
return (<-task)
}

View File

@ -1,27 +1,6 @@
package main
import (
"sync"
pqueuekey "github.com/474420502/focus/priority_queuekey"
"github.com/gin-gonic/gin"
)
type Task struct {
}
type Queue struct {
lock sync.Mutex
count uint64
queue *pqueuekey.PriorityQueue
}
func (q *Queue) Push(task *Task) {
q.lock.Lock()
defer q.lock.Unlock()
q.count++
q.queue.Push(q.count, task)
}
import "github.com/gin-gonic/gin"
func PushQueue(c *gin.Context) {

67
server/task_test.go Normal file
View File

@ -0,0 +1,67 @@
package main
import (
"sync"
"testing"
"time"
)
func TestTaskCasePushPop(t *testing.T) {
queue := NewQueue()
queue.Push(&Task{})
queue.Push(&Task{})
if queue.queue.Size() != 2 {
t.Error("size != 2")
}
task := queue.Pop()
if task == nil {
t.Error(task == nil)
}
if queue.queue.Size() != 1 {
t.Error("size != 1")
}
task = queue.PopBlock()
if task == nil {
t.Error(task == nil)
}
if queue.queue.Size() != 0 {
t.Error("size != 1")
}
}
func TestTaskCaseBlock(t *testing.T) {
queue := NewQueue()
wait := &sync.WaitGroup{}
wait.Add(1)
go func(wait *sync.WaitGroup, queue *Queue) {
defer wait.Done()
time.Sleep(time.Millisecond * 500)
queue.Push(&Task{})
time.Sleep(time.Millisecond * 500)
queue.Push(&Task{})
}(wait, queue)
if queue.PopBlock() == nil {
t.Error("value is error")
}
if queue.queue.Size() != 0 {
t.Error("queue errro")
}
first := time.Now()
if queue.PopBlock() == nil {
t.Error("value is error")
}
if queue.queue.Size() != 0 {
t.Error("queue errro")
}
if d := time.Now().Sub(first).Milliseconds(); d < 500 {
t.Errorf("time duration is error %d", d)
}
wait.Wait()
}