chromeworker/proxyserver/router.go

173 lines
4.3 KiB
Go

package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/bwmarrin/snowflake"
"github.com/gin-gonic/gin"
)
var taskQueue = NewQueue()
var waitQueue = NewQueue()
var readyQueue = NewQueue()
var errorQueue = NewQueue()
var snowNode *snowflake.Node
func initSnowflake() {
if snowNode == nil {
node, err := snowflake.NewNode(1)
if err != nil {
panic(err)
}
snowNode = node
}
}
func init() {
log.SetFlags(log.Llongfile | log.LstdFlags)
initOplog()
initSnowflake()
engine.GET("/", func(c *gin.Context) {
c.JSON(200, Response{Code: 200, Message: "Home Page"})
})
task := engine.Group("/task")
task.GET("/get", GetTask)
task.POST("/put", PutTask)
task.POST("/content", ContentTask)
task.POST("/error", ErrorTask)
task.GET("/ready", ReadyTask)
task.POST("/ack", AckTask)
}
// GetTask 获取当前一条任务列表
func GetTask(c *gin.Context) {
if itask, ok := taskQueue.Pop(); ok {
task := itask.(*Task)
waitQueue.Push(task.data["taskid"], task)
c.JSON(http.StatusOK, Response{Code: 200, Message: "", Data: task})
return
}
c.JSON(http.StatusOK, Response{Code: 204, Message: "No Task"})
}
// PutTask 把一条任务放入队列
func PutTask(c *gin.Context) {
u := c.PostForm("url")
if u != "" {
data := NewTask()
now := time.Now()
tid := snowNode.Generate().Base64()
label := c.PostForm("label")
data.Store("taskid", tid)
data.Store("url", u)
data.Store("ts", now.UnixNano())
data.Store("label", label)
data.Store("content_condition", c.PostForm("content_condition"))
if callback := c.PostForm("callback"); callback != "" {
data.Store("callback", callback)
}
taskQueue.Push(tid, data)
oplog.Write(data)
c.JSON(http.StatusOK, Response{Code: 200, Message: "ok", Data: data})
return
}
c.JSON(http.StatusOK, Response{Code: 400, Message: "url 不错在"})
return
}
// ContentTask 把一条任务放入队列
func ContentTask(c *gin.Context) {
var err error
tid, ok := c.GetPostForm("taskid")
if !ok {
c.JSON(http.StatusOK, Response{Code: 404, Message: "taskid is not set"})
}
if iv, ok := waitQueue.Remove(tid); ok {
if content, ok := c.GetPostForm("content"); ok {
task := iv.(*Task)
task.Store("content", content)
task.Store("status", "ready")
readyQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化
c.JSON(200, Response{Code: 200, Data: task})
// log.Println("start callback")
if label, ok := task.Load("label"); ok {
log.Println(label.(string), tid)
}
go CallbackServer(task)
return
}
}
c.JSON(200, Response{Code: 404, Message: fmt.Sprintln("response: ", err)})
}
// AckTask 确认整个任务流程完成.
func AckTask(c *gin.Context) {
tid := c.PostForm("taskid")
if tid != "" {
if itask, ok := readyQueue.Get(tid); ok {
task := itask.(*Task)
if status, ok := task.Load("status"); ok {
if status.(string) == "readying" {
task.Store("status", "readied")
c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s readied", tid)})
return
}
c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s is not readying", tid)})
return
}
}
} else {
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")})
return
}
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)})
}
// ReadyTask 已完成的任务. 读取期间为readying状态.
func ReadyTask(c *gin.Context) {
tid := c.Query("taskid")
if tid != "" {
if itask, ok := readyQueue.Get(tid); ok {
task := itask.(*Task)
task.Store("status", "readying")
c.JSON(http.StatusOK, Response{Code: 200, Data: task})
return
}
} else {
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")})
return
}
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)})
}
// ErrorTask 任务错误无法完成
func ErrorTask(c *gin.Context) {
tid := c.PostForm("taskid")
errorStr := c.PostForm("error")
if itask, ok := waitQueue.Remove(tid); ok {
task := itask.(*Task)
task.Store("error", errorStr)
errorQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化
c.JSON(http.StatusOK, Response{Code: 200})
return
}
c.JSON(http.StatusOK, Response{Code: 404})
return
}