这里是小焦,最近我一直在学习爬虫,所以作为爬虫反爬的利器之一,代理肯定是少不了的。我们都知道一个ip地址频繁的爬取一个网站,稍微有点反爬手段的网站都会检测到我们的IP访问频率从而限制爬虫或者直接封掉IP地址。
要想避免这种情况,我们就不得不借助代理了。虽然不能百分比防止反爬,但至少还是能拿下大多数的普通站点。好了废话不多说,开始今天的正题吧。通过我这段时间的学习和查阅资料,一个完整的代理池基本上由四个模块组成:
- 获取模块:顾名思义就是我们要获取代理,这个自己在网上找。可以分为免费代理和付费代理,当然我们肯定选择免费的呗,资金充裕的可以选择付费的,免费的基本上很多都不能用,付费的相对稳定速度快一点。
- 存储模块:我们从各大途径搞到的代理IP肯定是要存储到我们的数据库中的,这里我们选择Redis数据库,不仅调用起来速度快,而且利用它的有序集合可以给代理打分。
- 检测模块:我们知道免费的肯定有很多IP是不能用的,一百个IP可能也就那么几个稳定正常的,所以我们要对搞过来的代理逐个测试它的稳定性。利用Redis数据库的有序集合给每个代理给定一个分数,稳定的给100分放到前面,不稳定甚至链接不上的,分数给低一点当多次链接不上则移除。
- 接口模块:刚开始我一直觉得搞这接口干啥直接调用数据库不就得了,最后看了众多代理池还是非常有必要搞。第一我们如果直接调用数据库,就需要输入用户名和密码非常不安全,而且redis部署在远程服务器的话,而且redis只允许本地调用的话,我们就不能调用。其次如果爬虫是用别的语言写的,调用又得该代码,等等,总之写一个接口非常的有必要,这里我们还是和前辈们一样使用flask这个web框架,简单看了下比django简单多了。
上面我们说明了本次代理池的整体结构,下面来整理一下代码的流程。首先我们是要获取代理,我们找了几个免费的代理网站,已经写到代码里面了,这里不多说。因为我们获取到代码后要写入到数据库中的,所以我们在写获取代理文件之前先把数据库的操作一个个封装成函数,这样方便我们后期的调用。
整体流程:获取代理→存储代理→测试代理→建立接口
先来看看数据库的操作吧,这里我们命名为db.py 所有的代码我都做了备注,这里就不啰嗦了
# 此文件主要是将redis我们需要的操作封装成函数,方便调用
# date:2020/10/27
# 此文件全部是利用redis有序集合的特性,利用每个成员分数来配合调用
MAX_SCORE = 100 # 最大分数100
MIN_SCORE = 0 # 最小分数0
INITIAL_SCORE = 10 # 初始分为10
REDIS_HOST = 'localhost' # 链接地址
REDIS_PORT = 6379 # 端口号,同意都是6379
REDIS_PASSWORD = None # 没有密码
REDIS_KEY = 'proxies' # 有序集合的键名
import redis
from random import choice
# 创建redis操作类
class RedisClient(object):
# 初始化类
def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD):
# 由于要传参,所以上面提前吧参数传进去,下面直接调用。由于redis输出的是字节,这块我们用decode设置为字符串
self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True)
# 定义添加函数,如果返回的代理没有分数,则给此代理设置初始分数10
def add(self,proxy,score=INITIAL_SCORE):
if not self.db.zscore(REDIS_KEY,proxy): # 判断有序集合中指定成员的分数,如果没有就给添加一个初始分数
# 这块有个注意的,py3.0后zadd的方法传参方式改变了,第二个参数为字典
proxy_score = {
proxy: score,
}
return self.db.zadd(REDIS_KEY,proxy_score)
# 定义随机获取代理,先获取分数高的代理,随机获取,没有的话就按排名获取,否则抛出异常
def random(self):
result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
if len(result):
return choice(result) # 如果有值就随机选取一个返回
else:
result = self.db.zrevrange(REDIS_KEY,0,100) # 通过索引获取0-100区间的成员,默认由大到小排列,随机返回一个
if len(result):
return choice(result)
else:
raise PoolEmptyError # 抛出异常
# 代理值减一分,分数小于最小值的话直接删除代理
def decrease(self,proxy):
score = self.db.zscore(REDIS_KEY,proxy)
if score and score > MIN_SCORE:
print('代理',proxy,'当前分数',score,'减1分')
# py3.0后zincrby的参数顺序改变了,分数在前面,值在后面
return self.db.zincrby(REDIS_KEY,-1,proxy,) # 给代理减去一分
else:
print('代理',proxy,'当前分数',score,'移除')
return self.db.zrem(REDIS_KEY,proxy) # 移除命令
# 判断代理是否存在
def exists(self,proxy):
return not self.db.zscore(REDIS_KEY,proxy) == None #利用查看分数确实是否存在,如存在返回True,不存在返回Flase
# 将代理设置为最大分数
def max(self,proxy):
print('代理',proxy,'可用,分数设置为:',MAX_SCORE)
return self.db.zadd(REDIS_KEY,MAX_SCORE,proxy)
# 获取代理的数量
def count(self):
return self.db.zcard(REDIS_KEY)
# 获取全部的代理,通过分数最低到最高
def all(self):
return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE)
获取代理,这里我简单写了两个网站的代理,更多的大伙自己添加。这里是运用了元类的功能,获取方法并调用。我对元类的概念还是有点似懂非懂,不过却挺好用。我们命名为crawler.py 因为这块要进行数据库的操作,所以我们提前把数据库的操作分装成函数非常的方便。
# 此文件主要用来获取代理
# 代码参照老崔的python网络爬虫实战,由于老崔的代理有好多用不了,这块自己添加了几个
# 2020.10.25
# 我的个人博客:jiaokangyang.com
from pyquery import PyQuery as pq
import requests
import time
import random
from db import RedisClient
# 这块将获取网页html的代码直接写成函数方便后面调用
def get_page(url):
# 将很多的头信息放到一块,随机选用,提高爬虫反爬
user_agent = ['Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36']
headers = {
'User-Agent':random.choice(user_agent), # 随机选择一个头,让爬虫更顺利一点
}
r = requests.get(url,headers=headers)
html = r.text
return html
# 这里创建一个元类,说实话我对元类也不是很懂,照猫画虎,感觉和函数的装饰器有点像,主要功能是给下面的类添加几个属性,不在改变元代码的基础上操作。
class ProxyMetaclass(type):
def __new__(cls,name,bases,attrs):
count = 0
# 添加一个属性为list,将我们后面带crawl标识的函数加到里面,后面自有它的用处
attrs['__CrawlFunc__'] = []
# attrs包含了类的属性,我们遍历所有的属性,筛选出带关键字的
for k,v in attrs.items():
# print(k,v)
if 'crawl_' in k:
# 将带关键字的函数名全部加到该属性中,K是筛选出来的函数名
attrs['__CrawlFunc__'].append(k)
count += 1 # 统计数量
attrs['__CrawlFunCount__'] = count
return type.__new__(cls,name,bases,attrs)
# 下面的类就是获取代理的具体内容了,
class Crawler(object,metaclass=ProxyMetaclass):
# 该函数获取对应函数返回的代理
def get_proxies(self,callback):
# 创建一个空列表,用来存放我们的代理
proxies = []
# callback代表的是该类中我们筛选的函数名,这里,我们直接用eval函数直接执行,返回结果
for proxy in eval("self.{}()".format(callback)):
print('成功获取到代理',proxy)
proxies.append(proxy) #将获取到的代理加入到list中
return proxies
# 66 网站的代理,这里我们获取前4页的,网站为www.66ip.cn 函数统一前缀为crawl
def crawl_daili66(self,page_count=10):
# 起始网页
start_url = 'http://www.66ip.cn/{}.html'
# 将前4页的网页链接写到list中
urls = [start_url.format(page) for page in range(1,page_count + 1)]
for url in urls:
print('地址:',url)
time.sleep(1) #为保证正常访问,必须降低抓取频率
html = get_page(url)
if html:
a = pq(html)
# 由于第一个是标题,我们用gt伪类从0以后开始选取匹配
trs = a('.containerbox table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':'.join([ip,port])
# 下面为kuaidaili的代理获取,方法同上
def crawl_kuaidaili(self,page_count=10):
start_url = 'https://www.kuaidaili.com/free/inha/{}/'
urls = [start_url.format(page) for page in range(1,page_count+1)]
for url in urls:
print('爬取页面',url)
time.sleep(1) # 由于该网站有反爬,必须将抓取频率降低
html = get_page(url)
if html:
a = pq(html)
trs = a('.table tbody tr').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':'.join([ip,port])
# 后期添加代理直接添加
# http://www.ip3366.net/
#
# 下面的类用来执行上面代理获取模块,获取代理并储存到redis中
POOL_UPPER_THRESHOLD = 1000 #设置代理池上线为500
class Getter():
def __init__(self):
self.redis = RedisClient()
self.crawler = Crawler()
# 设置一个函数判断代理池是否达到上限
def is_over_threshold(self):
if self.redis.count() >= POOL_UPPER_THRESHOLD: # 如果超过或等于上限
return True
else:
return False
# 执行函数
def run(self):
print('执行获取代理开始')
if not self.is_over_threshold():
# 通过上个类里面统计到的函数数量,循环挨个执行函数获取代理
for callback_lable in range(self.crawler.__CrawlFunCount__):
callback = self.crawler.__CrawlFunc__[callback_lable] # callback为Crawler类中的代理函数名
# 用到Crawler类中的get_proxies函数,将抓取到的函数保存到一个列表中,并且返回
proxies = self.crawler.get_proxies(callback)
# 遍历返回的列表,将抓到的数据保存到redis中
for proxy in proxies:
self.redis.add(proxy) # 将代理加入到有序集合proxies中,并设置初始分
检测代理的有效性是非常的有必要,如果大量的代理超时链接不上,非常的浪费时间。而且我们要优先调用稳定速度快的代理,这里利用redis的有序集合给每个成员一个分数,优先调用分数高的,这就是为什么不使用mangodb的原因了。
# 此文件为代理的测试模块,主要是检验代理的可用性,及给每个代理在有序集合中打分
# 2020.11.2
# 我的个人博客:jiaokangyang.com
import aiohttp
from aiohttp import ClientError,ClientConnectorError
from db import RedisClient
import asyncio
import time
# 设置状态码,如需多个,可在列表中多添加价格
VALID_STATUS_CODES = [200]
# 设置测试网站,这里我们选择百度
TEST_URL = 'http://www.baidu.com/'
# 设置批量测试的最大值
BATCH_TEST_SIZE = 100
# 创建测试类
class Tester(object):
# 初始化类
def __init__(self):
# 实例化redis类给redis对象
self.redis = RedisClient()
# 声明函数是异步的,且定义测试函数,测试单个代理
async def test_single_proxy(self,proxy):
# 限制异步的输数量,且不验证是否为https,这块也不是很懂,照着写
conn = aiohttp.TCPConnector(verify_ssl=False)
# 用aiohttp创建实例,命名为session
async with aiohttp.ClientSession(connector=conn) as session:
# 由于可能会发生报错等等,这里用try except
try:
if isinstance(proxy,bytes): # isinstance方法为判断数据类型,如果为bytes则执行下面语句
proxy = proxy.decode('utf-8') # 如果是字节则解码为utf-8
# 给代理前面加http
real_proxy = 'http://' + proxy
print('正在测试代理:',proxy)
# 下面方法与requests类似,加入代理后测试我们预设的网站
async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
# 如果状态码在上面定义的列表中,则代理可用,并设置分数为100,上面我们只添加了状态码200
if response.status in VALID_STATUS_CODES:
self.redis,max(proxy)
print('代理可用',proxy)
else:
self.redis.decrease(proxy)
print('请求响应码不合法',proxy) # 如果响应码不为200,则减分
# 如果中途报错则抛出异常,说明代理不成功,减分
# except (ClientError,ClientConnectorError,TimeoutError,AttributeError):
except:
self.redis.decrease(proxy)
print('代理请求失败',proxy)
# 上面我们定义了单个代理的测试,现在运行测试函数
def run(self):
print('测试器开始运行')
try:
# 将所有的代理获取出来
proxies = self.redis.all()
# 创建一个协程
loop = asyncio.get_event_loop()
# 开始批量测试,每次调用100个,这快运用了列表的切片功能
for i in range(0,len(proxies),BATCH_TEST_SIZE):
# 第一个循环取0-99 第二次取100-199,以此类推,这快看了好久才反应过来,基础知识很重要
test_proxies = proxies[i:i+BATCH_TEST_SIZE]
# 将每个测试代理的对象加到列表中。异步执行此批量方法。异步这块由于边看边学,有的地方理解不透
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('测试器发生错误',e.args)
if __name__ == '__main__':
a = Tester()
a.run()
经过上面三步,我们已经搞到了很多的代理,并对代理一一检测打分,说实话有用的也就那么几个,大家有能力的话或者商业大规模爬取的话还是使用付费的代理吧。下面我们来进行接口模块,flask的路由比django简单的多,做接口刚好合适。后期我们启动后,可以直接部署到服务器上面,爬虫只需调用它即可。
# 此文件为我们的代理调用接口,利用flask的web服务来调用,
# 除了方便之外,也是保护我们的数据库敏感信息
# date:2020.11.2
# My blog:jiaokangyang.com
from flask import Flask,g
from db import RedisClient
# __all__定义了该代码中那些方法可被调用
__all__ = ['app']
# 创建的应用名为app
app = Flask(__name__)
# 链接数据库
def get_conn():
# 使用g,来定义一个redis变量,实例化redis类
if not hasattr(g,'redis'):
g.redis = RedisClient()
return g.redis
# route是flask框架的路由
@app.route('/')
def index():
return '<h2>欢迎使用焦康阳的代理池接口</h2>' # 这里首页为一个欢迎语
# 获取一个随机代理,
@app.route('/random')
def get_proxy():
conn = get_conn()
return conn.random()
# 获取代理池数量
@app.route('/count')
def get_counts():
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run()
上面我们已经完整的搭建好了一个代理池。我们只需按顺序运行,最后为了更加方便,继续再做一个调度模块,非常简单,就是利用多进程的形式,让所有的任务一起运行。一步到位。
# 此文件为我们本次代理模块的最终调度模块,主要是为了方便控制所有的文件开关
# date:2020.11.2
# My blog:jiaokangyang.com
# 使用多进程模块运行,process是多进程函数
from multiprocessing import Process
from api import app
from crawler import Getter
from tester import Tester
import time
# 上面两个变量是循环时间,下面三个则为对于模块的开关,我们设置为True
TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = False
GETTER_ENABLED = False
API_ENABLED = True
API_HOST = '0.0.0.0'
API_PORT = 5000
class Scheduler():
# 测试模块
def schedule_tester(self,cycle=TESTER_CYCLE):
# 定时测试代理
tester = Tester()
while True:
print('测试模块开始运行')
tester.run()
time.sleep(cycle)
# 获取模块执行
def schedule_getter(self,cycle=GETTER_CYCLE):
# 定时获取代理
getter = Getter()
while True:
print('开始抓取代理')
getter.run()
time.sleep(cycle)
# api 模块
def schedule_api(self):
# 开启api
app.run(API_HOST,API_PORT)
def run(self):
print('代理池开始运行')
if TESTER_ENABLED:
# 创建进程
tester_process = Process(target=self.schedule_tester)
# 启动进程
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start()
if __name__ == '__main__':
a = Scheduler()
a.run()
通过上述的几步我们已经完整的做好了整个代理池。调用非常简单,我们只需访问flask服务的地址,用requests访问对应的路由获取返回的文本就行。今天的内容就到这,有问题随时咨询我,有爬虫需求也可联系。
评论1