chromeworker_client/client.go

297 lines
6.6 KiB
Go

package cwclient
import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
func init() {
log.SetFlags(log.Llongfile | log.LstdFlags)
}
// CallbackContext Callback上下文
type CallbackContext struct {
TaskID string
Content string
Error error
Carry interface{} // 传递的参数.
}
// Callback 发送代理连接获取内容后的回调函数
type Callback struct {
label string
hash string
Do func(cxt *CallbackContext)
}
// Client 客户端
type Client struct {
chromeProxyAddr string
carrayCache sync.Map
register sync.Map
host string
port string
server *http.Server
listener net.Listener
lock sync.Mutex
}
// Label 区分不同任务类型
type Label struct {
label string
conditionJS string
retry string
waitcapture string
configlock sync.Mutex
cli *Client
}
// GetWaitime Get return waitime int
func (l *Label) GetWaitime() int {
r, _ := strconv.Atoi(l.waitcapture)
return r
}
// SetWaitime Set waitime int
func (l *Label) SetWaitime(waitime int) {
l.waitcapture = strconv.Itoa(waitime)
}
// GetRetry Get return retry int
func (l *Label) GetRetry() int {
r, _ := strconv.Atoi(l.retry)
return r
}
// SetRetry Set retry int
func (l *Label) SetRetry(retry int) {
l.retry = strconv.Itoa(retry)
}
// GetHash 根据label获取hash路径
func (l *Label) GetHash(label string) string {
if cb, ok := l.cli.register.Load(label); ok {
return cb.(Callback).hash
}
return ""
}
// GetLabel 根据hash获取路径label
func (l *Label) GetLabel(hash string) string {
if cb, ok := l.cli.register.Load(hash); ok {
return cb.(Callback).label
}
return ""
}
// Open 缓存了Label值. 每次调用少了label传参. carray每次都会给一次请求回调传入参数.
func (l *Label) Open(urlstr string, carray interface{}) (bodyRes string, ok bool) {
return l.cli.open(l, urlstr, carray)
}
// SetContentCondition 设置识别到的内容条件. js代码. 必须是一个函数. 命名可以随意. 返回bool
func (l *Label) SetContentCondition(jsScript string) {
l.configlock.Lock()
defer l.configlock.Unlock()
l.conditionJS = jsScript
}
// SetContentConditionFromFile 设置识别到的内容条件. js代码. 从js文件 必须是一个函数. 命名可以随意. 返回bool
func (l *Label) SetContentConditionFromFile(jsScriptFile string) {
f, err := os.Open(jsScriptFile)
if err != nil {
panic(err)
}
data, err := ioutil.ReadAll(f)
if err != nil {
panic(err)
}
l.configlock.Lock()
defer l.configlock.Unlock()
// log.Println(string(data))
l.conditionJS = string(data)
}
// GetPort Get return port string. default random.
func (cli *Client) GetPort() string {
return cli.port
}
// SetPort Set port string. before client call connect method
func (cli *Client) SetPort(port string) {
cli.port = port
}
// GetHost Get return host string
func (cli *Client) GetHost() string {
return cli.host
}
// SetHost Set host string. default http://127.0.0.1
func (cli *Client) SetHost(host string) {
cli.host = host
}
// Register 注册基础信息
func (cli *Client) Register(label string, callback func(cxt *CallbackContext)) *Label {
cb := Callback{Do: callback, hash: uuid.New().String()}
if _, ok := cli.register.Load(label); ok {
log.Panic("label: ", label, " is exists")
}
cli.register.Store(label, cb)
cli.register.Store(cb.hash, cb)
l := &Label{label: label, cli: cli, waitcapture: "6000", retry: "1"}
return l
}
// UnRegister 卸载注册基础信息
func (cli *Client) UnRegister(label string) {
if cb, ok := cli.register.Load(label); ok {
cli.register.Delete(label)
cli.register.Delete(cb.(Callback).hash)
}
}
// Connect 连接初始化回调端口
func (cli *Client) Connect() {
cli.lock.Lock()
defer cli.lock.Unlock()
if cli.server == nil {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
cli.port = fmt.Sprintf("%d", listener.Addr().(*net.TCPAddr).Port)
router := gin.Default()
router.POST("/:label", func(c *gin.Context) {
label := c.Param("label")
if f, ok := cli.register.Load(label); ok {
callback := f.(Callback)
if tid, ok := c.GetPostForm("taskid"); ok {
content := c.PostForm("content")
errorStr := c.PostForm("error")
carrayhash := c.PostForm("carrayhash")
var carray interface{}
if icarray, ok := cli.carrayCache.Load(carrayhash); ok {
carray = icarray.(*Carray).data
}
var err error = nil
if errorStr != "" {
err = fmt.Errorf(errorStr)
}
cxt := &CallbackContext{
TaskID: tid,
Content: content,
Error: err,
Carry: carray,
}
callback.Do(cxt)
}
}
})
cli.server = &http.Server{
Addr: cli.host + ":" + cli.port,
Handler: router,
}
go func() {
err := cli.server.Serve(listener)
if err != nil {
panic(err)
}
}()
} else {
log.Println("client had connected.")
}
// panic(http.Serve(listener, nil))
}
// Disconnect 断开连接
func (cli *Client) Disconnect() {
err := cli.server.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
// open 请求完url后 调用不同label注册的回调函数. bodyRes 请求后服务器返回的基础信息. 如果不需要debug一般不需要使用.
func (cli *Client) open(label *Label, urlstr string, carray interface{}) (bodyRes string, ok bool) {
// urlstr = "https://playerduo.com/api/playerDuo-service-v2/rip113?lang=en&deviceType=browser"
if cli.server == nil {
panic("client is not connect. Client.Connect() ? ")
}
if callback, ok := cli.register.Load(label.label); ok {
data := url.Values{}
data["url"] = []string{urlstr}
data["callback"] = []string{cli.host + ":" + cli.port + "/" + callback.(Callback).hash}
data["label"] = []string{label.label}
if carray != nil {
carrayhash := uuid.New().String()
c := &Carray{hash: carrayhash, data: carray, expire: time.Now().Add(time.Minute * 2)}
label.cli.carrayCache.Store(carrayhash, c)
data["carrayhash"] = []string{carrayhash}
}
func() {
label.configlock.Lock()
defer label.configlock.Unlock()
data["content_condition"] = []string{label.conditionJS}
data["waitcapture"] = []string{label.waitcapture}
data["retry"] = []string{label.retry}
}()
resp, err := http.DefaultClient.PostForm(cli.chromeProxyAddr+"/task/put", data)
if err != nil {
panic(err)
}
bodyRes, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
return string(bodyRes), true
}
log.Printf("label: %s is not exists", label.label)
return "", false
}
// New 创建Client并初始化
func New(addr string) *Client {
cli := &Client{}
cli.chromeProxyAddr = addr
cli.host = "http://127.0.0.1"
return cli
}