TODO: workflow add method change proxy
This commit is contained in:
parent
addc6bfa6b
commit
4083abe141
26
config.go
26
config.go
|
@ -8,6 +8,8 @@ import (
|
|||
"reflect"
|
||||
"strings"
|
||||
|
||||
"474420502.top/eson/structure/circular_linked"
|
||||
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
|
@ -55,7 +57,7 @@ func (curls *YamlCurls) MarshalYAML() (interface{}, error) {
|
|||
}
|
||||
|
||||
// YamlProxies 为了自定义序列化函数
|
||||
type YamlProxies []string
|
||||
type YamlProxies clinked.CircularLinked
|
||||
|
||||
// UnmarshalYAML YamlProxies反序列化函数
|
||||
func (proxies *YamlProxies) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
|
@ -68,10 +70,12 @@ func (proxies *YamlProxies) UnmarshalYAML(unmarshal func(interface{}) error) err
|
|||
|
||||
switch tbuf := buf.(type) {
|
||||
case string:
|
||||
*proxies = append(*proxies, tbuf)
|
||||
p := (*clinked.CircularLinked)(proxies)
|
||||
p.Append(tbuf)
|
||||
case []interface{}:
|
||||
p := (*clinked.CircularLinked)(proxies)
|
||||
for _, ifa := range tbuf {
|
||||
*proxies = append(*proxies, ifa.(string))
|
||||
p.Append(ifa.(string))
|
||||
}
|
||||
default:
|
||||
return errors.New("read curls is error, " + reflect.TypeOf(buf).String())
|
||||
|
@ -84,8 +88,10 @@ func (proxies *YamlProxies) UnmarshalYAML(unmarshal func(interface{}) error) err
|
|||
// MarshalYAML YamlProxies 序列化函数
|
||||
func (proxies *YamlProxies) MarshalYAML() (interface{}, error) {
|
||||
content := "["
|
||||
for _, curl := range []string(*proxies) {
|
||||
content += "\"" + curl + "\"" + ", "
|
||||
p := (*clinked.CircularLinked)(proxies)
|
||||
|
||||
for _, cnode := range p.GetLoopValues() {
|
||||
content += "\"" + cnode.GetValue().(string) + "\"" + ", "
|
||||
}
|
||||
content = strings.TrimRight(content, ", ")
|
||||
content += "]"
|
||||
|
@ -95,11 +101,11 @@ func (proxies *YamlProxies) MarshalYAML() (interface{}, error) {
|
|||
// Config 任务加载的默认配置
|
||||
type Config struct {
|
||||
// Session int `yaml:"session"`
|
||||
Mode int `yaml:"mode"`
|
||||
Proxies YamlProxies `yaml:"proxies"`
|
||||
Retry int `yaml:"retry"`
|
||||
Priority int `yaml:"priority"`
|
||||
Curls YamlCurls `yaml:"curls"`
|
||||
Mode int `yaml:"mode"`
|
||||
Proxies *YamlProxies `yaml:"proxies"`
|
||||
Retry int `yaml:"retry"`
|
||||
Priority int `yaml:"priority"`
|
||||
Curls YamlCurls `yaml:"curls"`
|
||||
|
||||
Crontab string `yaml:"crontab"`
|
||||
ITask string `yaml:"task"`
|
||||
|
|
29
person.go
29
person.go
|
@ -6,6 +6,8 @@ import (
|
|||
"reflect"
|
||||
"time"
|
||||
|
||||
"474420502.top/eson/structure/circular_linked"
|
||||
|
||||
"474420502.top/eson/curl2info"
|
||||
)
|
||||
|
||||
|
@ -22,8 +24,8 @@ type ITask interface {
|
|||
SetCurl(Curl *curl2info.CURL)
|
||||
GetCurl() *curl2info.CURL
|
||||
|
||||
GetProxies() []string
|
||||
AppendProxies(proxies ...string)
|
||||
GetProxies() *clinked.CircularLinked
|
||||
AddProxies(proxy string)
|
||||
|
||||
TimeUp() bool
|
||||
NextTime() time.Time
|
||||
|
@ -62,8 +64,8 @@ func NewPerson() *Person {
|
|||
return person
|
||||
}
|
||||
|
||||
// LoadConfig 加载配置
|
||||
func (person *Person) LoadConfig(conf string) {
|
||||
// Config 加载配置
|
||||
func (person *Person) Config(conf string) {
|
||||
person.Conf = NewConfig(conf)
|
||||
person.Tasks = splitTasks(person.Conf)
|
||||
}
|
||||
|
@ -71,13 +73,14 @@ func (person *Person) LoadConfig(conf string) {
|
|||
// NewPersonWithConfig 创建一个person
|
||||
func NewPersonWithConfig(conf string) *Person {
|
||||
person := NewPerson()
|
||||
person.LoadConfig(conf)
|
||||
person.Config(conf)
|
||||
return person
|
||||
}
|
||||
|
||||
// SplitTasks 拆开出需求的任务
|
||||
func splitTasks(conf *Config) []ITask {
|
||||
var tasks []ITask
|
||||
proxies := (*clinked.CircularLinked)(conf.Proxies)
|
||||
|
||||
for _, scurl := range conf.Curls {
|
||||
curl, err := curl2info.ParseRawCURL(scurl)
|
||||
|
@ -89,16 +92,20 @@ func splitTasks(conf *Config) []ITask {
|
|||
curl.ITask = conf.ITask
|
||||
}
|
||||
task := makeRegisterType(curl.ITask)
|
||||
|
||||
switch conf.Mode {
|
||||
case 0:
|
||||
|
||||
initTask(conf, task, curl)
|
||||
task.AppendProxies(conf.Proxies...)
|
||||
tasks = append(tasks, task)
|
||||
for _, cnode := range proxies.GetLoopValues() {
|
||||
proxy := cnode.GetValue().(string)
|
||||
task.AddProxies(proxy)
|
||||
}
|
||||
|
||||
tasks = append(tasks, task)
|
||||
case 1:
|
||||
for _, proxy := range conf.Proxies {
|
||||
|
||||
for _, cnode := range proxies.GetLoopValues() {
|
||||
proxy := cnode.GetValue().(string)
|
||||
|
||||
ncurl, err := curl2info.ParseRawCURL(scurl)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -110,7 +117,7 @@ func splitTasks(conf *Config) []ITask {
|
|||
ptask := makeRegisterType(ncurl.ITask).(ITask)
|
||||
initTask(conf, ptask, ncurl)
|
||||
|
||||
ptask.AppendProxies(proxy)
|
||||
ptask.AddProxies(proxy)
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
}
|
||||
|
|
34
task.go
34
task.go
|
@ -3,6 +3,8 @@ package imitater
|
|||
import (
|
||||
"time"
|
||||
|
||||
"474420502.top/eson/structure/circular_linked"
|
||||
|
||||
"474420502.top/eson/crontabex"
|
||||
"474420502.top/eson/curl2info"
|
||||
"474420502.top/eson/requests"
|
||||
|
@ -17,7 +19,7 @@ type Task struct {
|
|||
curl *curl2info.CURL
|
||||
workflow *requests.Workflow
|
||||
session *requests.Session
|
||||
proxies YamlProxies
|
||||
proxies clinked.CircularLinked
|
||||
}
|
||||
|
||||
// SetName 任务的名字
|
||||
|
@ -40,16 +42,17 @@ func (task *Task) GetCurl() *curl2info.CURL {
|
|||
return task.curl
|
||||
}
|
||||
|
||||
// AppendProxies 添加代理集合
|
||||
func (task *Task) AppendProxies(proxies ...string) {
|
||||
for _, proxy := range proxies {
|
||||
task.proxies = append(task.proxies, proxy)
|
||||
// AddProxies 添加代理集合
|
||||
func (task *Task) AddProxies(proxy string) {
|
||||
for _, cnode := range task.proxies.GetLoopValues() {
|
||||
proxy := cnode.GetValue().(string)
|
||||
task.proxies.Append(proxy)
|
||||
}
|
||||
}
|
||||
|
||||
// GetProxies 获取代理的字符串
|
||||
func (task *Task) GetProxies() []string {
|
||||
return task.proxies
|
||||
func (task *Task) GetProxies() *clinked.CircularLinked {
|
||||
return &task.proxies
|
||||
}
|
||||
|
||||
// SetCrontab 设置crontab的控制规则字符串
|
||||
|
@ -74,10 +77,8 @@ func (task *Task) NextTime() time.Time {
|
|||
|
||||
// Workflow 根据persistent 是否返回现有session(or 新建 session),创建Workflow的信息, 便于设置或者更改参数, Session持久化
|
||||
func (task *Task) Workflow(persistent bool) *requests.Workflow {
|
||||
if task.workflow == nil {
|
||||
task.workflow = task.curl.CreateWorkflow(task.Session())
|
||||
}
|
||||
|
||||
task.workflow = task.curl.CreateWorkflow(task.Session())
|
||||
if persistent {
|
||||
return task.workflow
|
||||
}
|
||||
|
@ -87,7 +88,13 @@ func (task *Task) Workflow(persistent bool) *requests.Workflow {
|
|||
|
||||
// Request 根据curl信息执行,没持久化
|
||||
func (task *Task) Request() (*requests.Response, error) {
|
||||
return task.curl.CreateWorkflow(nil).Execute()
|
||||
proxies := task.GetProxies()
|
||||
ses := task.curl.CreateSession()
|
||||
if proxies.Size() > 0 {
|
||||
ses.SetConfig(requests.CProxy, proxies.Cursor().GetValue().(string))
|
||||
proxies.MoveNext()
|
||||
}
|
||||
return task.curl.CreateWorkflow(ses).Execute()
|
||||
}
|
||||
|
||||
// Session 获取Session的信息 只保留session的数据和url参数.
|
||||
|
@ -95,5 +102,10 @@ func (task *Task) Session() *requests.Session {
|
|||
if task.session == nil {
|
||||
task.session = task.curl.CreateSession()
|
||||
}
|
||||
if task.proxies.Size() > 0 {
|
||||
task.session.SetConfig(requests.CProxy, task.proxies.Cursor().GetValue().(string))
|
||||
task.proxies.MoveNext()
|
||||
}
|
||||
|
||||
return task.session
|
||||
}
|
||||
|
|
|
@ -34,10 +34,10 @@ func (tt *Toutiao) Execute(data interface{}) ITask {
|
|||
}
|
||||
|
||||
func TestExecutePlan(t *testing.T) {
|
||||
|
||||
Register("toutiao", &Toutiao{})
|
||||
|
||||
person := NewPerson()
|
||||
person.LoadConfig("test.yaml")
|
||||
person.Config("test.yaml")
|
||||
person.Execute()
|
||||
|
||||
t.Error("")
|
||||
|
|
Loading…
Reference in New Issue
Block a user