This commit is contained in:
huangsimin 2018-12-19 18:28:06 +08:00
parent 937ae3c546
commit b8f1940cac
4 changed files with 10 additions and 12 deletions

View File

@ -144,13 +144,11 @@ func (p *Parser) ConfigQueue(yamlpath string) {
panic(err) panic(err)
} }
log.Println(string(data))
err = yaml.Unmarshal(data, p.que) err = yaml.Unmarshal(data, p.que)
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Println(p.que)
if err := p.que.Connect(); err != nil { if err := p.que.Connect(); err != nil {
panic(err) panic(err)
} }

View File

@ -37,9 +37,9 @@ func (tt *Toutiao) ToDoParser(adstring string) (string, error) {
func TestParserToutiao(t *testing.T) { func TestParserToutiao(t *testing.T) {
tt := Toutiao{} tt := Toutiao{}
tt.ConfigLogDB("logdb.yaml") // tt.ConfigLogDB("logdb.yaml")
tt.ConfigQueue("queue.yaml") tt.ConfigQueue("queue.yaml")
ADParserServer(&tt) // ADParserServer(&tt)
t.Error("") t.Error("")
} }

View File

@ -11,9 +11,9 @@ import (
// Queue Youmi的队列 // Queue Youmi的队列
type Queue struct { type Queue struct {
url string `yaml:"url"` Url string `yaml:"url"`
exchange string `yaml:"exchange"` Exchange string `yaml:"exchange"`
routekey string `yaml:"routekey"` Routekey string `yaml:"routekey"`
mutex sync.Mutex mutex sync.Mutex
conn *amqp.Connection conn *amqp.Connection
@ -23,7 +23,7 @@ type Queue struct {
// NewQueue youmi queue队列 mq // NewQueue youmi queue队列 mq
func NewQueue(url string, exchange, routekey string) *Queue { func NewQueue(url string, exchange, routekey string) *Queue {
que := Queue{url: url, exchange: exchange, routekey: routekey} que := Queue{Url: url, Exchange: exchange, Routekey: routekey}
que.contentType = "application/json" que.contentType = "application/json"
if err := que.Connect(); err != nil { if err := que.Connect(); err != nil {
@ -45,7 +45,7 @@ func (que *Queue) Connect() error {
que.mutex.Lock() que.mutex.Lock()
defer que.mutex.Unlock() defer que.mutex.Unlock()
conn, err := amqp.Dial(que.url) conn, err := amqp.Dial(que.Url)
if err != nil { if err != nil {
return err return err
} }
@ -68,7 +68,7 @@ func (que *Queue) Push(data []byte) error {
panic(err) panic(err)
} }
} }
return ch.Publish(que.exchange, que.routekey, false, false, amqp.Publishing{ return ch.Publish(que.Exchange, que.Routekey, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent, DeliveryMode: amqp.Persistent,
Timestamp: time.Now(), Timestamp: time.Now(),
ContentType: que.contentType, ContentType: que.contentType,

View File

@ -1,4 +1,4 @@
# url: "amqp://spider:spider@172.16.6.109:5672/test_adspider"
url: "amqp://test_aso:guest@10.10.10.31:5672/test_spider_go" url: "amqp://test_aso:guest@10.10.10.31:5672/test_spider_go"
exchange: "ad_process" exchange: "ad_process"
routekey: "CN" routekey: "CN"
# url: "amqp://spider:spider@172.16.6.109:5672/test_adspider"