From b70b8491baacfc8b328d528c478896ee9eae1db1 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Wed, 28 Apr 2021 14:11:24 +0800 Subject: [PATCH] feat(queue): add queue --- server/case_test.go | 13 ++------ server/queue.go | 76 +++++++++++++++++++++++++++++++++++++++++++++ server/task.go | 23 +------------- server/task_test.go | 67 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 147 insertions(+), 32 deletions(-) create mode 100644 server/queue.go create mode 100644 server/task_test.go diff --git a/server/case_test.go b/server/case_test.go index 33f1f78..c3ebca5 100644 --- a/server/case_test.go +++ b/server/case_test.go @@ -1,12 +1,5 @@ package main -import ( - "testing" - - "github.com/474420502/focus/compare" - pqueuekey "github.com/474420502/focus/priority_queuekey" -) - -func Benchmark123(b *testing.B) { - pqueuekey.New(compare.Int) -} +// func Benchmark123(b *testing.B) { +// pqueuekey.New(compare.Int) +// } diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 0000000..29af6d3 --- /dev/null +++ b/server/queue.go @@ -0,0 +1,76 @@ +package main + +import ( + "container/list" + "sync" + + "github.com/474420502/focus/compare" + pqueuekey "github.com/474420502/focus/priority_queuekey" +) + +type Task struct { + Value int +} + +type Queue struct { + lock sync.Mutex + count uint64 + + waitChans *list.List + + queue *pqueuekey.PriorityQueue +} + +func NewQueue() *Queue { + return &Queue{ + waitChans: list.New(), + queue: pqueuekey.New(compare.UInt64), + } +} + +func (q *Queue) Push(task *Task) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.waitChans.Len() != 0 { + element := q.waitChans.Front() + ctask := element.Value.(chan *Task) + ctask <- task + q.waitChans.Remove(element) + return + } + + q.count++ + q.queue.Push(q.count, task) +} + +func (q *Queue) Pop() (task *Task) { + q.lock.Lock() + defer q.lock.Unlock() + q.count++ + + if itask, ok := q.queue.Pop(); ok { + return itask.(*Task) + } + + return nil +} + +func (q *Queue) popBlock(task chan *Task) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.queue.Size() != 0 { + itask, _ := q.queue.Pop() + go func() { task <- itask.(*Task) }() + return + } + + q.waitChans.PushBack(task) +} + +func (q *Queue) PopBlock() *Task { + task := make(chan *Task) + q.popBlock(task) + return (<-task) +} diff --git a/server/task.go b/server/task.go index 4f09e79..4404f42 100644 --- a/server/task.go +++ b/server/task.go @@ -1,27 +1,6 @@ package main -import ( - "sync" - - pqueuekey "github.com/474420502/focus/priority_queuekey" - "github.com/gin-gonic/gin" -) - -type Task struct { -} - -type Queue struct { - lock sync.Mutex - count uint64 - queue *pqueuekey.PriorityQueue -} - -func (q *Queue) Push(task *Task) { - q.lock.Lock() - defer q.lock.Unlock() - q.count++ - q.queue.Push(q.count, task) -} +import "github.com/gin-gonic/gin" func PushQueue(c *gin.Context) { diff --git a/server/task_test.go b/server/task_test.go new file mode 100644 index 0000000..bd0d734 --- /dev/null +++ b/server/task_test.go @@ -0,0 +1,67 @@ +package main + +import ( + "sync" + "testing" + "time" +) + +func TestTaskCasePushPop(t *testing.T) { + queue := NewQueue() + queue.Push(&Task{}) + queue.Push(&Task{}) + if queue.queue.Size() != 2 { + t.Error("size != 2") + } + + task := queue.Pop() + if task == nil { + t.Error(task == nil) + } + if queue.queue.Size() != 1 { + t.Error("size != 1") + } + + task = queue.PopBlock() + if task == nil { + t.Error(task == nil) + } + if queue.queue.Size() != 0 { + t.Error("size != 1") + } +} + +func TestTaskCaseBlock(t *testing.T) { + queue := NewQueue() + wait := &sync.WaitGroup{} + wait.Add(1) + + go func(wait *sync.WaitGroup, queue *Queue) { + defer wait.Done() + time.Sleep(time.Millisecond * 500) + queue.Push(&Task{}) + time.Sleep(time.Millisecond * 500) + queue.Push(&Task{}) + }(wait, queue) + + if queue.PopBlock() == nil { + t.Error("value is error") + } + if queue.queue.Size() != 0 { + t.Error("queue errro") + } + first := time.Now() + + if queue.PopBlock() == nil { + t.Error("value is error") + } + if queue.queue.Size() != 0 { + t.Error("queue errro") + } + + if d := time.Now().Sub(first).Milliseconds(); d < 500 { + t.Errorf("time duration is error %d", d) + } + + wait.Wait() +}