实现请求去重
# 去重原理
# 学习目标
- 掌握去重的方法
- 完成代码的重构,实现去重
# 1 去重的理解
其实就只是对以往数据进行一个比对,判断是否已经存在
可大致分为:对原始数据比对、对利用原始数据生成的特征值进行比对两种方式
原始数据比对很好理解,就是比对的时候参照值就是原始数据;而利用特征值比对,比如最典型的就是利用原始数据生成一个指纹,比对的参照值就是这个指纹,不是原始数据本身,主要应用于单个原始数据比较大的情况,另外一种常用就是布隆过滤器,这种方式原始利用一种"特征值",应用场景是海量数据的去重(但具有一定几率的误判)。
# 2 爬虫请求去重原理和实现
根据请求的url、请求方法、请求参数、请求体进行唯一标识,进行比对,由于这四个数据加到一起,内容较长,因此使用求指纹的方式来进行去重判断。
指纹计算方法,最常用的就是md5、sha1等hash加密算法,来求指纹
在scrapy_plus/core/scheduler.py中完整代码实现如下:
# coding:utf-8
from six.moves.queue import Queue # 兼容py2py3 修改requirements.txt文件
# from queue import Queue # py3
import six
import w3lib.url # 修改requirements.txt文件
from hashlib import sha1
from scrapy_plus.utils.log import logger
class Scheduler():
"""
缓存请求对象(Request),并为下载器提供请求对象,实现请求的调度:
对请求对象进行去重判断:实现去重方法_filter_request,该方法对内提供,因此设置为私有方法
"""
def __init__(self):
self.queue = Queue()
self.repeat_request_num = 0 # 统计重复的数量
# 在engine中阻塞的位置判断程序结束的条件:成功的响应数+重复的数量>=总的请求数量
self._filter_container = set() # 去重容器,是一个集合,存储已经发过的请求的特征 url
def add_request(self, request):
# 添加请求对象,前提是指纹没有重复
if self._filter_request(request):
self.queue.put(request)
def get_request(self):
# 获取请求对象
# return self.queue.get()
try:
request = self.queue.get(False) # 从队列获取请求对象设置为非阻塞,否则会造成程序无法退出
except:
return None
else:
return request
def _filter_request(self, request):
# 去重方法
request.fp = self._gen_fp(request) # 给request对象增加一个fp指纹属性
if request.fp not in self._filter_container:
self._filter_container.add(request.fp) # 向指纹容器集合添加一个指纹
return True
else:
self.repeat_request_num += 1
logger.info("发现重复的请求:<{} {}>".format(request.method, request.url))
return False
def _gen_fp(self, request):
"""生成并返回request对象的指纹
用来判断请求是否重复的属性:url,method,params(在url中),data
为保持唯一性,需要对他们按照同样的排序规则进行排序
"""
# 1. url排序:借助w3lib.url模块中的canonicalize_url方法
url = w3lib.url.canonicalize_url(request.url)
# 2. method不需要排序,只要保持大小写一致就可以 upper()方法全部转换为大写
method = request.method.upper()
# 3. data排序:如果有提供则是一个字典,如果没有则是空字典
data = request.data if request.data is not None else {}
data = sorted(data.items(), key=lambda x: x[0]) # 用sorted()方法 按data字典的key进行排序
# items()返回元祖 key参数表示按什么进行排序 x表示data.items() x[0]表示元祖第一个值,也就是data的键
# 4. 利用sha1计算获取指纹
s1 = sha1()
s1.update(self._to_bytes(url)) # sha1计算的对象必须是字节类型
s1.update(self._to_bytes(method))
s1.update(self._to_bytes(str(data)))
fp = s1.hexdigest()
return fp
def _to_bytes(self, string):
"""为了兼容py2和py3,利用_to_bytes方法,把所有的字符串转化为字节类型"""
if six.PY2:
if isinstance(string, str):
return string
else: # 如果是python2的unicode类型,转化为字节类型
return string.encode('utf-8')
elif six.PY3:
if isinstance(string, str): # 如果是python3的str类型,转化为字节类型
return string.encode("utf-8")
else:
return string
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# 3 修改engine模块
现在统计了总的重复数量,所以,在engine中阻塞的位置判断程序结束的条件:成功的响应数+重复的数量>=总的请求数量程序结束
#scrapy_plus/core/engine.py
.....
def _execute_request_response_item(self):
......
# parse方法
parse = getattr(spider, request.parse) # getattr(类, 类中方法名的字符串) = 类方法对象
# 此处修改该
#5. 调用爬虫的parse方法,处理响应
results = parse(response)
# 增加一个判断!
if results is not None: # 如果项目中爬虫的解析函数不返回可迭代对象就会报错
for result in results:
#6.判断结果的类型,如果是request,重新调用调度器的add_request方法
if isinstance(result,Request):
......
def _start_engine(self):
'''
具体的实现引擎的细节
:return:
'''
self._start_request()
while True:
time.sleep(0.001)
self._execute_request_response_item()
# 此处修改
#成功的响应数+重复的数量>=总的请求数量 程序结束
if self.total_response_nums+self.scheduler.repeat_request_num >= self.total_request_nums:
break
.....
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 小结
- 掌握sha1等加密方法
- 理解去重过程中警醒url排序,字符串的处理,字典的排序的原因
# 本小结涉及修改的完整代码
scrapy_plus/core/engine.py
'''引擎组件'''
from scrapy_plus.http.request import Request # 导入Request对象
from .scheduler import Scheduler
from .downloader import Downloader
from .pipeline import Pipeline
from .spider import Spider
from scrapy_plus.middlewares.spider_middlewares import SpiderMiddleware
from scrapy_plus.middlewares.downloader_middlewares import DownloaderMiddleware
import time
from datetime import datetime
from scrapy_plus.utils.log import logger # 导入logger
import importlib
from scrapy_plus.conf.settings import SPIDERS, PIPELINES, SPIDER_MIDDLEWARES, DOWNLOADER_MIDDLEWARES
class Engine(object):
'''
a. 对外提供整个的程序的入口
b. 依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)
'''
def __init__(self):
self.spiders = self._auto_import_instances(SPIDERS,isspider=True) # 接收爬虫字典
self.scheduler = Scheduler() # 初始化调度器对象
self.downloader = Downloader() # 初始化下载器对象
self.pipelines = self._auto_import_instances(PIPELINES) # 管道
self.spider_mids = self._auto_import_instances(SPIDER_MIDDLEWARES) # 爬虫中间件
self.downloader_mids = self._auto_import_instances(DOWNLOADER_MIDDLEWARES) # 下载中间件
self.total_request_nums = 0
self.total_response_nums = 0
def _auto_import_instances(self, path=[], isspider=False):
'''通过配置文件,动态导入类并实例化
path: 表示配置文件中配置的导入类的路径
isspider: 由于爬虫需要返回的是一个字典,因此对其做对应的判断和处理
'''
instances = {} if isspider else []
for p in path:
module_name = p.rsplit(".", 1)[0] # 取出模块名称
cls_name = p.rsplit(".", 1)[1] # 取出类名称
ret = importlib.import_module(module_name) # 动态导入爬虫模块
cls = getattr(ret, cls_name) # 根据类名称获取类对象
if isspider:
instances[cls.name] = cls() # 组装成爬虫字典{spider_name:spider(),}
else:
instances.append(cls()) # 实例化类对象
# 把管道中间件分别组装成 管道列表=[管道类1(),管道类2()] / 中间件列表 = [中间件类1(),中间件类2()]
return instances # 返回类对象字典或列表
def start(self):
'''启动整个引擎'''
start_time = datetime.now() # 起始时间
logger.info("开始运行时间:%s" % start_time) # 使用日志记录起始运行时间
self._start_engine()
end_time = datetime.now()
logger.info("爬虫结束:{}".format(end_time))
logger.info("爬虫一共运行:{}秒".format((end_time-start_time).total_seconds()))
logger.info("总的请求数量:{}".format(self.total_request_nums))
logger.info("总的响应数量:{}".format(self.total_response_nums))
def _start_request(self):
for spider_name, spider in self.spiders.items():
for start_request in spider.start_requests():
#1. 对start_request进过爬虫中间件进行处理
for spider_mid in self.spider_mids:
start_request = spider_mid.process_request(start_request)
# 为请求对象绑定它所属的爬虫的名称
start_request.spider_name = spider_name
#2. 调用调度器的add_request方法,添加request对象到调度器中
self.scheduler.add_request(start_request)
#请求数+1
self.total_request_nums += 1
def _execute_request_response_item(self):
'''根据请求、发起请求获取响应、解析响应、处理响应结果'''
#3. 调用调度器的get_request方法,获取request对象
request = self.scheduler.get_request()
if request is None: #如果没有获取到请求对象,直接返回
return
#request对象经过下载器中间件的process_request进行处理
for downloader_mid in self.downloader_mids:
request = downloader_mid.process_request(request)
#4. 调用下载器的get_response方法,获取响应
response = self.downloader.get_response(request)
response.meta = request.meta
#response对象经过下载器中间件的process_response进行处理
for downloader_mid in self.downloader_mids:
response = downloader_mid.process_response(response)
#response对象经过下爬虫中间件的process_response进行处理
for spider_mid in self.spider_mids:
response = spider_mid.process_response(response)
# 根据request的spider_name属性,获取对应的爬虫对象
spider = self.spiders[request.spider_name]
# parse方法
parse = getattr(spider, request.parse) # getattr(类, 类中方法名的字符串) = 类方法对象
#5. 调用爬虫的parse方法,处理响应
results = parse(response)
# 增加一个判断!
if results is not None: # 如果项目中爬虫的解析函数不返回可迭代对象就会报错
for result in results:
#6.判断结果的类型,如果是request,重新调用调度器的add_request方法
if isinstance(result,Request):
#在解析函数得到request对象之后,使用process_request进行处理
for spider_mid in self.spider_mids:
result = spider_mid.process_request(result)
# 给request对象增加一个spider_name属性
result.spider_name = request.spider_name
self.scheduler.add_request(result)
self.total_request_nums += 1
#7如果不是,调用pipeline的process_item方法处理结果
else:
# 就通过process_item()传递数据给管道
for pipeline in self.pipelines:
pipeline.process_item(result, spider)
self.total_response_nums += 1
def _start_engine(self):
'''
具体的实现引擎的细节
:return:
'''
self._start_request()
while True:
time.sleep(0.001)
self._execute_request_response_item()
#成功的响应数+重复的数量>=总的请求数量 程序结束
if self.total_response_nums+self.scheduler.repeat_request_num >= self.total_request_nums:
break
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
scrapy_plus/core/scheduler.py
见课件正文
1
编辑 (opens new window)