博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
爬虫连接mongodb、多线程多进程的使用
阅读量:6692 次
发布时间:2019-06-25

本文共 6661 字,大约阅读时间需要 22 分钟。

一、连接mongodb

 

1、            设置数据库 client=pymongo.MongoClient(‘localhost’)

2、            db=client[‘lagou’]设置连接的数据库名称

POSITION_NAME=’’ 、PAGE_SUM 、PAGE_SIZE 等为你设置的变量名称。
3、DATA_NAME=’dataposition’   # # 指定数据库的名字
4、设置保存在mongo数据库中的数据: def save_to_mongo(data):     if db[DATA_NAME].update({'positionId': data['positionId']}, {'$set': data}, True):         print('Saved to Mongo', data['positionId'])     else:         print('Saved to Mongo Failed', data['positionId'])
这是以positionId为唯一标识,如果数据库里面已经存在有positionId,说明数据已经爬过了,不再更新。
 
二、多进程设置和使用:
1、导入多进程:from multiprocessing import Pool
导入时间  import time
2、start_time = time.time() pool = Pool()  # pool()参数:进程个数:默认的是电脑cpu的核的个数,如果要指定进程个数,这个进程个数要小于等于cpu的核数 # 第一个参数是一个函数体,不需要加括号,也不需指定参数。。 #  第二个参数是一个列表,列表中的每个参数都会传给那个函数体 pool.map(to_mongo_pool,[i for i in range(PAGE_SUM)]) # close它只是把进程池关闭 pool.close() # join起到一个阻塞的作用,主进程要等待子进程运行完,才能接着往下运行 pool.join() end_time = time.time() print("总耗费时间%.2f秒" % (end_time - start_time))
 

to_mongo_pool:这个函数要设计好,就一个参数就够了,然后把它的参数放在列表里面,通过map高阶函数一次传给to_mongo_pool

 

多线程的使用:

多线程要配合队列使用:

 

# coding=utf-8 import requests from lxml import etree  import threading 导入线程 from queue import Queue  导入队列
# https://docs.python.org/3/library/queue.html#module-queue # 队列使用方法简介 # q.qsize() 返回队列的大小 # q.empty() 如果队列为空,返回True,反之False # q.full() 如果队列满了,返回True,反之False # q.full 与 maxsize 大小对应 # q.get([block[, timeout]]) 获取队列,timeout等待时间 # q.get_nowait() 相当q.get(False) # q.put(item) 写入队列,timeout等待时间 # q.put_nowait(item) 相当q.put(item, False) # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 # q.join() 实际上意味着等到队列为空,再执行别的操作 class Lianjia:     def __init__(self):         self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"         self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}         self.url_queue = Queue()         self.html_queue = Queue()         self.content_queue = Queue()     def get_url_list(self):         # return [self.url_temp.format(i) for i in range(1,14)]         for i in range(1, 14):             # 把13个索引页面的Url放进url_queue队列里             self.url_queue.put(self.url_temp.format(i))

 

   定义运行函数

def run(self):  # 实现主要逻辑     thread_list = []     # 1.url_list     # threading.Thread不需要传参数,参数都是从队列里面取得     t_url = threading.Thread(target=self.get_url_list)     thread_list.append(t_url)     # 2.遍历,发送请求,获取响应     for i in range(20):  # 添加20个线程         t_parse = threading.Thread(target=self.parse_url)         thread_list.append(t_parse)     # 3.提取数据     for i in range(2):  # 添加2个线程         t_html = threading.Thread(target=self.get_content_list)         thread_list.append(t_html)     # 4.保存     t_save = threading.Thread(target=self.save_content_list)     thread_list.append(t_save)     for t in thread_list:         t.setDaemon(True)  # 把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)         t.start()     for q in [self.url_queue, self.html_queue, self.content_queue]:         q.join()  # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程     print("主线程结束")

 代码如下:

# coding=utf-8import requestsfrom lxml import etreeimport threadingfrom queue import Queue# https://docs.python.org/3/library/queue.html#module-queue# 队列使用方法简介# q.qsize() 返回队列的大小# q.empty() 如果队列为空,返回True,反之False# q.full() 如果队列满了,返回True,反之False# q.full 与 maxsize 大小对应# q.get([block[, timeout]]) 获取队列,timeout等待时间# q.get_nowait() 相当q.get(False)# q.put(item) 写入队列,timeout等待时间# q.put_nowait(item) 相当q.put(item, False)# q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号# q.join() 实际上意味着等到队列为空,再执行别的操作class Lianjia:    def __init__(self):        self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"        self.headers = {            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}        self.url_queue = Queue()        self.html_queue = Queue()        self.content_queue = Queue()    def get_url_list(self):        # return [self.url_temp.format(i) for i in range(1,14)]        for i in range(1, 14):            # 把13个索引页面的Url放进url_queue队列里            self.url_queue.put(self.url_temp.format(i))    def parse_url(self):        while True:            # get方法和task_done搭配使用            # 在put是队列+1,get和task_done一起使用时队列才会-1            url = self.url_queue.get()            print(url)            response = requests.get(url, headers=self.headers)            # 然后把索引页的响应页面放进html_queue队列里            self.html_queue.put(response.content.decode())            self.url_queue.task_done()    def get_content_list(self):  # 提取数据        while True:            # 先从索引页响应页面html_queue队列里面取出索引页面            html_str = self.html_queue.get()            html = etree.HTML(html_str)            div_list = html.xpath('//li[@class="clear LOGCLICKDATA"]')  # 分组            content_list = []            for div in div_list:                item = {}                item['title'] = div.xpath('.//div[@class="title"]/a/text()')                item['href'] = div.xpath('.//div[@class="title"]/a/@href')                item['totalPrice'] = div.xpath('.//div[@class="totalPrice"]/span/text()')                item['houseInfo'] = div.xpath('.//div[@class="houseInfo"]/text()')                content_list.append(item)            # 把content_list放进content_queue里面            self.content_queue.put(content_list)            self.html_queue.task_done()    def save_content_list(self):  # 保存        while True:            content_list = self.content_queue.get()            for i in content_list:                print(i)                pass            self.content_queue.task_done()    def run(self):  # 实现主要逻辑        thread_list = []        # 1.url_list        # threading.Thread不需要传参数,参数都是从队列里面取得        t_url = threading.Thread(target=self.get_url_list)        thread_list.append(t_url)        # 2.遍历,发送请求,获取响应        for i in range(20):  # 添加20个线程            t_parse = threading.Thread(target=self.parse_url)            thread_list.append(t_parse)        # 3.提取数据        for i in range(2):  # 添加2个线程            t_html = threading.Thread(target=self.get_content_list)            thread_list.append(t_html)        # 4.保存        t_save = threading.Thread(target=self.save_content_list)        thread_list.append(t_save)        for t in thread_list:            t.setDaemon(True)  # 把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)            t.start()        for q in [self.url_queue, self.html_queue, self.content_queue]:            q.join()  # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程        print("主线程结束")if __name__ == '__main__':    qiubai =Lianjia()    qiubai.run()# 所没有tast_done方法,程序最终会卡着不动,无法终止

 

转载于:https://www.cnblogs.com/wwthuanyu/p/9999613.html

你可能感兴趣的文章
打印出占用空间大于一定值的目录
查看>>
研究微信小程序
查看>>
Oracle常量
查看>>
PE文件头
查看>>
VC获取物理网卡的MAC地址
查看>>
Web学习>>一些专业名词
查看>>
找出由‘1’组成的孤岛
查看>>
c#中获取路径方法
查看>>
Java中的数组和方法
查看>>
异步获取CMD命令行输出内容
查看>>
AIX/Linux/HP-UX查看CPU/内存/磁盘/存储命令
查看>>
第九章结构体与公用体
查看>>
筛法求素数的最优算法+解释
查看>>
.net使用sqllite实例
查看>>
七牛直播云-m3u8格式直播
查看>>
sqlserver2008 创建定时任务
查看>>
as3+php上传图片的三种方式
查看>>
sqlserver master恢复
查看>>
看这一篇就够了,css选择器知识汇总
查看>>
[译]科学计算可视化在andriod与ios实现的工具
查看>>