使用python asyncio实现了一个异步代理池,根据规则爬取代理网站上的免费代理,在验证其有效后存入redis中,定期扩展代理的数量并检验池中代理的有效性,移除失效的代理。同时用aiohttp实现了一个server,其他的程序可以通过访问相应的url来从代理池中获取代理。
源码
环境
因为代码中大量使用了asyncio的async和await语法,它们是在Python3.5中才提供的,所以最好使用Python3.5及以上的版本,我使用的是Python3.6。
依赖
selenium包主要是用来操作PhantomJS的。
下面来对代码进行说明。
1. 爬虫部分
核心代码
async def start(self):
for rule in self._rules:
parser = asyncio.ensure_future(self._parse_page(rule)) # 根据规则解析页面来获取代理
logger.debug('{0} crawler started'.format(rule.__rule_name__))
if not rule.use_phantomjs:
await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理网站的页面
else:
await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,
rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取
await self._pages.join()
parser.cancel()
logger.debug('{0} crawler finished'.format(rule.__rule_name__))
上面的核心代码实际上是一个用asyncio.Queue实现的生产-消费者模型,下面是该模型的一个简单实现:
import asyncio
from random import random
async def produce(queue, n):
for x in range(1, n + 1):
print('produce ', x)
await asyncio.sleep(random())
await queue.put(x) # 向queue中放入item
async def consume(queue):
while 1:
item = await queue.get() # 等待从queue中获取item
print('consume ', item)
await asyncio.sleep(random())
queue.task_done() # 通知queue当前item处理完毕
async def run(n):
queue = asyncio.Queue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue, n) # 等待生产者结束
await queue.join() # 阻塞直到queue不为空
consumer.cancel() # 取消消费者任务,否则它会一直阻塞在get方法处
def aio_queue_run(n):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run(n)) # 持续运行event loop直到任务run(n)结束
finally:
loop.close()
if __name__ == '__main__':
aio_queue_run(5)
运行上面的代码,一种可能的输出如下:
produce 1
produce 2
consume 1
produce 3
produce 4
consume 2
produce 5
consume 3
consume 4
consume 5
爬取页面
async def page_download(urls, pages, flag):
url_generator = urls
async with aiohttp.ClientSession() as session:
for url in url_generator:
if flag.is_set():
break
await asyncio.sleep(uniform(delay - 0.5, delay + 1))
logger.debug('crawling proxy web page {0}'.format(url))
try:
async with session.get(url, headers=headers, timeout=10) as response:
page = await response.text()
parsed = html.fromstring(decode_html(page)) # 使用bs4来辅助lxml解码网页:http://lxml.de/elementsoup.html#Using only the encoding detection
await pages.put(parsed)
url_generator.send(parsed) # 根据当前页面来获取下一页的地址
except StopIteration:
break
except asyncio.TimeoutError:
logger.error('crawling {0} timeout'.format(url))
continue # TODO: use a proxy
except Exception as e:
logger.error(e)
使用aiohttp实现的网页爬取函数,大部分代理网站都可以使用上面的方法来爬取,对于使用js动态生成页面的网站可以使用selenium控制PhantomJS来爬取――本项目对爬虫的效率要求不高,代理网站的更新频率是有限的,不需要频繁的爬取,完全可以使用PhantomJS。
解析代理
最简单的莫过于用xpath来解析代理了,使用Chrome浏览器的话,直接通过右键就能获得选中的页面元素的xpath:
安装Chrome的扩展"XPath Helper"就可以直接在页面上运行和调试xpath,十分方便:
BeautifulSoup不支持xpath,使用lxml来解析页面,代码如下:
async def _parse_proxy(self, rule, page):
ips = page.xpath(rule.ip_xpath) # 根据xpath解析得到list类型的ip地址集合
ports = page.xpath(rule.port_xpath) # 根据xpath解析得到list类型的ip地址集合
if not ips or not ports:
logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'.
format(len(ips), len(ports), rule.__rule_name__))
return
proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports)
if rule.filters: # 根据过滤字段来过滤代理,如"高匿"、"透明"等
filters = []
for i, ft in enumerate(rule.filters_xpath):
field = page.xpath(ft)
if not field:
logger.warning('{1} crawler could not get {0} field, please check the filter xpath'.
format(rule.filters[i], rule.__rule_name__))
continue
filters.append(map(lambda x: x.text.strip(), field))
filters = zip(*filters)
selector = map(lambda x: x == rule.filters, filters)
proxies = compress(proxies, selector)
for proxy in proxies:
await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中
爬虫规则
网站爬取、代理解析、滤等等操作的规则都是由各个代理网站的规则类定义的,使用元类和基类来管理规则类。基类定义如下:
class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta):
start_url = None
page_count = 0
urls_format = None
next_page_xpath = None
next_page_host = ''
use_phantomjs = False
phantomjs_load_flag = None
filters = ()
ip_xpath = None
port_xpath = None
filters_xpath = ()
各个参数的含义如下:
start_url
(必需)
爬虫的起始页面。
ip_xpath
(必需)
爬取IP的xpath规则。
port_xpath
(必需)
爬取端口号的xpath规则。
page_count
爬取的页面数量。
urls_format
页面地址的格式字符串,通过urls_format.format(start_url, n)来生成第n页的地址,这是比较常见的页面地址格式。
next_page_xpath
,next_page_host
由xpath规则来获取下一页的url(常见的是相对路径),结合host得到下一页的地址:next_page_host + url。
use_phantomjs
,phantomjs_load_flag
use_phantomjs用于标识爬取该网站是否需要使用PhantomJS,若使用,需定义phantomjs_load_flag(网页上的某个元素,str类型)作为PhantomJS页面加载完毕的标志。
filters
过滤字段集合,可迭代类型。用于过滤代理。
爬取各个过滤字段的xpath规则,与过滤字段按顺序一一对应。
元类CrawlerRuleMeta用于管理规则类的定义,如:如果定义use_phantomjs=True,则必须定义phantomjs_load_flag,否则会抛出异常,不在此赘述。
目前已经实现的规则有西刺代理、快代理、360代理、66代理和 秘密代理。新增规则类也很简单,通过继承CrawlerRuleBase来定义新的规则类YourRuleClass,放在proxypool/rules目录下,并在该目录下的init.py中添加from . import YourRuleClass(这样通过CrawlerRuleBase.subclasses()就可以获取全部的规则类了),重启正在运行的proxy pool即可应用新的规则。
2. 检验部分
免费的代理虽然多,但是可用的却不多,所以爬取到代理后需要对其进行检验,有效的代理才能放入代理池中,而代理也是有时效性的,还要定期对池中的代理进行检验,及时移除失效的代理。
这部分就很简单了,使用aiohttp通过代理来访问某个网站,若超时,则说明代理无效。
async def validate(self, proxies):
logger.debug('validator started')
while 1:
proxy = await proxies.get()
async with aiohttp.ClientSession() as session:
try:
real_proxy = 'http://' + proxy
async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp:
self._conn.put(proxy)
except Exception as e:
logger.error(e)
proxies.task_done()
3. server部分
使用aiohttp实现了一个web server,启动后,访问http://host:port即可显示主页:
因为主页是一个静态的html页面,为避免每来一个访问主页的请求都要打开、读取以及关闭该html文件的开销,将其缓存到了redis中,通过html文件的修改时间来判断其是否被修改过,如果修改时间与redis缓存的修改时间不同,则认为html文件被修改了,则重新读取文件,并更新缓存,否则从redis中获取主页的内容。
返回代理是通过aiohttp.web.Response(text=ip.decode('utf-8'))
实现的,text要求str类型,而从redis中获取到的是bytes类型,需要进行转换。返回的多个代理,使用eval即可转换为list类型。
返回主页则不同,是通过aiohttp.web.Response(body=main_page_cache, content_type='text/html')
,这里body要求的是bytes类型,直接将从redis获取的缓存返回即可,conten_type='text/html'
必不可少,否则无法通过浏览器加载主页,而是会将主页下载下来――在运行官方文档中的示例代码的时候也要注意这点,那些示例代码基本上都没有设置content_type。
这部分不复杂,注意上面提到的几点,而关于主页使用的静态资源文件的路径,可以参考之前的博客《aiohttp之添加静态资源路径》。
4. 运行
将整个代理池的功能分成了3个独立的部分:
proxypool
定期检查代理池容量,若低于下限则启动代理爬虫并对代理检验,通过检验的爬虫放入代理池,达到规定的数量则停止爬虫。
proxyvalidator
用于定期检验代理池中的代理,移除失效代理。
proxyserver
启动server。
这3个独立的任务通过3个进程来运行,在Linux下可以使用supervisod来=管理这些进程,下面是supervisord的配置文件示例:
; supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock
[inet_http_server]
port=127.0.0.1:9001
[supervisord]
logfile=/tmp/supervisord.log
logfile_maxbytes=5MB
logfile_backups=10
loglevel=debug
pidfile=/tmp/supervisord.pid
nodaemon=false
minfds=1024
minprocs=200
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock
[program:proxyPool]
command=python /path/to/ProxyPool/run_proxypool.py
redirect_stderr=true
stdout_logfile=NONE
[program:proxyValidator]
command=python /path/to/ProxyPool/run_proxyvalidator.py
redirect_stderr=true
stdout_logfile=NONE
[program:proxyServer]
command=python /path/to/ProxyPool/run_proxyserver.py
autostart=false
redirect_stderr=true
stdout_logfile=NONE
因为项目自身已经配置了日志,所以这里就不需要再用supervisord捕获stdout和stderr了。通过supervisord -c supervisord.conf启动supervisord,proxyPool和proxyServer则会随之自动启动,proxyServer需要手动启动,访问http://127.0.0.1:9001即可通过网页来管理这3个进程了:
supervisod的官方文档说目前(版本3.3.1)不支持python3,但是我在使用过程中没有发现什么问题,可能也是由于我并没有使用supervisord的复杂功能,只是把它当作了一个简单的进程状态监控和启停工具了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持脚本之家!
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。