diff --git a/parser.go b/parser.go index 2683564..705eef6 100644 --- a/parser.go +++ b/parser.go @@ -166,49 +166,46 @@ func (p *Parser) GetQueue() *Queue { // ADParserServer 主入口循环 func ADParserServer(adp IParser) { + adresponse := adp.GetLogDB().ADParserSelect(adp.GetSpiderID()) // select from db + adrChan := make(chan logdb.ADResonse, 100) + wg := new(sync.WaitGroup) - for i := 0; i < 10; i++ { + for i := 0; i <= 10; i++ { wg.Add(1) - go func(n int) { - defer wg.Done() - for _ = range data { - } - }(i) + go parserAndSendMQ(adp, adrChan, wg) } - for i := 0; i < 10000; i++ { - data <- i - } - close(data) - wg.Wait() - - adresponse := adp.GetLogDB().ADParserSelect(adp.GetSpiderID()) // select from db - adresponseList := make(chan logdb.ADResonse, len(adresponse)) - for _, adr := range adresponse { - parserAndSendMQ(&adr, adp) + adrChan <- adr } + + close(adrChan) + wg.Wait() } -func parserAndSendMQ(adr *logdb.ADResonse, adp IParser) { - pjson, err := adp.ToDoParser(adr.Response) - if err != nil { - log.Println("uid:", adr.UID, "err:", err) - adp.GetLogDB().ADError(adr.UID, err.Error()) - return - } - // send pjson to mq - // update UID status finish - que := adp.GetQueue() - err = que.Push([]byte(pjson)) - if err != nil { - log.Println("uid:", adr.UID, "err:", err) - adp.GetLogDB().ADError(adr.UID, err.Error()) - return - } +func parserAndSendMQ(adp IParser, adrChan chan logdb.ADResonse, wg *sync.WaitGroup) { + defer wg.Done() - adp.GetLogDB().ADParserSuccess(adr.UID, pjson) + for adr := range adrChan { + + pjson, err := adp.ToDoParser(adr.Response) + if err != nil { + log.Println("uid:", adr.UID, "err:", err) + adp.GetLogDB().ADError(adr.UID, err.Error()) + continue + } + // send pjson to mq + // update UID status finish + que := adp.GetQueue() + err = que.Push([]byte(pjson)) + if err != nil { + log.Println("uid:", adr.UID, "err:", err) + adp.GetLogDB().ADError(adr.UID, err.Error()) + continue + } + adp.GetLogDB().ADParserSuccess(adr.UID, pjson) + } } // NewADParser 创建一个ADParser的类, 包含很多传到终端的所有结构 diff --git a/require.sh b/require.sh index b0d6b00..b46c6c0 100644 --- a/require.sh +++ b/require.sh @@ -1,2 +1,3 @@ https_proxy=474420502.top:7070 -go get -u github.com/streadway/amqp \ No newline at end of file +go get -u github.com/streadway/amqp +go get -u -insecure 474420502.top/test/logdb