爬虫与反爬
(1) 基本的概念
爬虫:
自动获取网站数据的程序
关键是 定时,定量的,批量的获取
反爬虫:
使用技术手段 防止爬虫程序的方法
存在误伤,即 反爬技术 将普通用户识别为爬虫
如果误伤高 --- 效果再好也不能使用
例子:
比如 限制 ip === 用户的ip 一般都是 局域网内动态分配的,
一个爬虫的ip 可能分配给 另一个 非爬虫的用户
有效的方法: 可以在一段时间内 限制 ip,过一段时间 再把 ip释放
反爬的成本:
成功率越高成本越大,拦截率越高,误伤率越高
反爬虫的目的:
初级爬虫:简单粗暴,不管服务器压力,容易使服务器奔溃
数据保护
失控的爬虫 某种情况下,无法或者没有关闭的爬虫
商业对手
爬虫 vs 反爬虫的对抗
网站反爬的的策略 vs 爬虫策略
1.监控 某个时间 访问突然增加,ip相同,user-agent不是浏览器;限制 ip访问 (注意:不能封ip)
(1)user-agent模拟,ip代理(ip代理池)
2 发现 ip 变化, 要求登录访问
(2) 注册 账号, 每次请求 带 cookie
3 开发健全的账号体系,每个账号 权限不同
(3) 多个账号 联合爬虫(维护账号池)
4 访问频繁 ,限制 ip频率
(4) 模仿人的请求速度
5 弹出 验证码
(5) 识别验证码
6 增加网页 内容的动态填充 ajax向后台请求
(6) selenium + phantomJs 完全模拟浏览器
爬虫技术使得网站反爬的成本会越来越高
(2)在scrapy中用到的反爬策略
1 随机切换 UA
1 可以在 settings 中维护一个 ua_list
然后每个 Request 里的headers 参数 随机获取;
缺点: 代码冗长,每个spider都要写
2 使用 downloadermiddleware 全局middleware,写一个 UA middleware
注意:scrapy有一个的默认UserAgentMiddlewarescrapy.downloadermiddlewares.
useragent.UserAgentMiddleware 会从setings 配置文件中获取
3 使用第三方的fake-useragent 包 FakeUseragent().ie / .random
实际是维护了一个 ua的网页 :https://fake-
useragent.herokuapp.com/browsers/0.1.5
4 配置 USER_AGENT_TYPE = ‘random’ + 利用 fake-useragent 来实现 随机ua
5 这样 在 scheduler 把 request 通过 engine 发给 下载器的时候 就会加上 RandomUA
代码实现如下:
settings:
USER_AGENT_TYPE = 'random'
DOWNLOADER_MIDDLEWARES = {
'项目.middlewares.RandomUserAgentMiddleware': 543,
}
middlewares.py:
class RandomUserAgentMiddleware(object):
def __init__(self,crawler):
super(RandomUserAgentMiddleware,self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get('USER_AGENT_TYPE','random')
@classmethod
def from_crawler(cls,crawler):
return cls(crawler)
def process_request(self,request,spider):
def get_ua():
return getattr(self.ua,self.ua_type)
print(get_ua())
request.headers.setdefault('USER_AGENT',get_ua())
2 设置ip 代理
(1)ip 不是固定不变的,存在一定的误伤:
云服务器:
阿里云不会变化
亚马逊服务器 重启之后ip会变动
小区的ip 也是动态分配
注意: 本机ip 爬取速度最好,最稳定,要尽量避免 本机ip被封掉
(2)ip 代理的 原理:
本地 向 代理服务器 发起请求,代理服务器 与 要请求的服务器进行 交互
经过代理之后的速度: 经过一次 中间服务器 速度会慢很多
(3)如何设置ip 代理
request.meta['proxy'] = 'http://ip + port'
(4)如何设置 ip代理池:
自己写一个 处理 ip的 脚本,随机获取
思路:
爬虫爬取 免费 ip;
放到数据库;
再 作测试,删除不能用的ip;
获取ip;
select ip,port from ip_proxy order by rand() limit 1;
随机返回 ip
代码实现: 1 爬取快代理的ip,并存入数据库:
import requests
from scrapy.selector import Selector
import MySQLdb
from concurrent.futures import ThreadPoolExecutor
# pool = ThreadPoolExecutor(max_workers=6)
# proxy = {
# 'https':'https://113.139.180.244:808' }
headers = {
'Host':'Host:www.kuaidaili.com',
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
}
conn = MySQLdb.connect(db='scrapytest', host='localhost', port=3306, user='root', passwd='', charset='utf8')
cursor = conn.cursor()
response = requests.get('https://www.kuaidaili.com/free/inha/1/',headers=headers)
selector = Selector(text=response.text)
# 获取 全部页码
pages = int(selector.xpath("//div[@id='listnav']/ul/li")[-2].xpath('a/text()').extract_first())
# 获取所有 url
page_list = ['https://www.kuaidaili.com/free/inha/'+ str(num) for num in range(1,pages+1)]
def get_text(page):
response = requests.get(page,headers=headers)
text = response.text
return text
def get_ips(text):
selector = Selector(text=text)
trs = selector.xpath("//tbody/tr")
for tr in trs:
ip = tr.xpath("td[1]/text()").extract_first()
port = tr.xpath("td[2]/text()").extract_first()
addr = tr.xpath("td[5]/text()").extract_first()
speed = tr.xpath("td[6]/text()").extract_first().split('秒')[0]
param = (ip,port,addr,speed)
filter_set.add(param)
return filter_set
def insert_mysql(param):
sql = '''
INSERT INTO ip_proxy(ip,port,addr,speed) VALUES(%s,%s,%s,%s)
'''
cursor.execute(sql,param)
conn.commit()
class GetRandomIP(object):
def judge_ip(self,ip,port):
url = 'http://www.baidu.com'
proxy_url = 'http://{0}:{1}'.format(ip,port)
try:
proxy_dict = {
'http':proxy_url
}
response = requests.get(url,proxies=proxy_dict)
except Exception as e:
self.delete_ip(ip)
return False
else:
code = response.status_code
if code >=200 and code <300:
return True
else:
self.delete_ip(ip)
return False
def delete_ip(self,ip):
sql = '''
DELETE FROM ip_proxy WHERE ip=%s
'''
cursor.execute(sql,(ip,))
conn.commit()
def get_random_ip(self):
sql = '''
SELECT ip,port FROM ip_proxy ORDER BY RAND() LIMIT 1
'''
cursor.execute(sql)
ip,port = cursor.fetchone()
if self.judge_ip(ip,port):
return '{0}:{1}'.format(ip,port)
else:
return self.get_random_ip()
# if __name__ == '__main__':
# filter_set= set()
# for page in page_list:
# text = get_text(page)
# get_ips(text)
# for param in filter_set:
# insert_mysql(param)
代码实现: 2 设置ip代理的中间件
settings
DOWNLOADER_MIDDLEWARES = {
'LG.middlewares.RandomIPMiddleware': 300,}
middlewraes
class RandomIPMiddleware(object):
def process_request(self,request,spider):
ip_port = GetRandomIP().get_random_ip()
request.meta['proxy'] = 'http://{0}'.format(ip_port)
3 模拟登陆 cookie与session
注意浏览器是 无状态请求
cookie(浏览器本地存储机制) -- 有状态的请求(本地存储,本地文件中)
cookie原理
用户 ---->>> 服务器
浏览器
<<----- 分配 id
----->> 再次请求(附加身份信息)
不能存储 客户的敏感信息 --- 本地文件容易丢失,不安全
session(基于cookie实现)
返给 客户端 一个session_id
服务器端生成(具有过期时间)
客户端请求的时候,带着session_id 在 服务器中获取 个人信息
模拟登陆的思路:
请求方式 :post
请求url:
请求参数:_xsrf
username
pwd
获取 session
代码实现:1 requests + cookie
import requests
try:
import cookielib # py2
except:
import http.cookiejar as cookielib # py3
# py2 和 py3 的 兼容代码
import re
def get_xsrf():
response = requests.get('https://www.zhihu.com',headers=headers)
text = response.text
xsrf = re.search(r';xsrf":"(.*?)"',text)
if xsrf:
return xsrf.group(1)
else:
return ''
headers = {
'HOST':'www.zhihu.com',
'Referer':'https://www.zhihu.com',
'User-Agent':'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',}
cookies_ = {
"_xsrf":get_xsrf(),
"z_c0":".........."}
response = requests.get(url='https://www.zhihu.com/inbox',
headers=headers,
cookies = cookies_,
allow_redirects = False)
print(response.status_code)
当然,也可以通过session来实现:
session = requests.session()
def zhihu_login(account,pwd):
# 知乎登录
post_url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
post_data = {
'_xsrf':get_xsrf(),
'username':account,
'password':pwd
}
response = session.post(post_url,data=post_data,headers=headers)
session.cookies.save()
zhihu_login('xxx','xxxx')
def get_logined_index():
response = session.get('https://www.zhihu.com',headers=headers)
with open('index.html','wb') as f:
f.write(response.text.encode('utf-8'))
get_logined_index()
代码实现:2 scrapy
爬取知乎的问答
import scrapy
import re,datetime
import json
from urllib.parse import urljoin
# 拼接域名 url
from scrapy.loader import ItemLoader
from zhihuspider.items import ZhihuQuestionItem,ZhihuAnswerItem
class ZhihuSpider(scrapy.Spider):
name = 'zhihu'
allowed_domains = ['zhihu.com']
start_urls = ['http://zhihu.com/']
headers = {
'HOST': 'www.zhihu.com',
'Referer': 'https://www.zhihu.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',
}
cookies_ = {
"_xsrf": '',
"z_c0": "............",
}
# api 接口
start_answer_url = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}'
# 知乎 question 的 answer 的起始 url
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com',callback=self.get_xsrf,headers=self.headers)] # 获取xsrf
def get_xsrf(self,response):
response_text = response.text
xsrf = re.search(r';xsrf":"(.*?)"', response_text,re.DOTALL)
if xsrf:
xsrf= xsrf.group(1)
else:
xsrf = ''
self.cookies_['_xsrf'] = xsrf
for url in self.start_urls:
yield scrapy.Request(url, headers=self.headers,cookies=self.cookies_, dont_filter=True,callback=self.parse)
def parse(self,response):
'''
提取所有 url === 深度优先的策略
其中 question/数字 的url
:param response:
:return:
'''
all_urls = response.css("a::attr(href)").extract()
all_urls = [urljoin(response.url,url) for url in all_urls]
all_urls = list(filter(lambda url:True if url.startswith('https') else False,all_urls ))
for url in all_urls:
match_obj = re.match('(.*zhihu.com/question/(\d+))(/.*|$)',url)
if match_obj:
request_url = match_obj.group(1)
question_id = match_obj.group(2)
yield scrapy.Request(request_url,meta={'question_id':question_id},headers=self.headers,callback=self.parse_question)
else:
yield scrapy.Request(url,headers=self.headers,callback=self.parse)
def parse_question(self,response):
"""
获取 详细的 item
:param response:
:return:
"""
question_id = response.meta.get('question_id')
item_loader = ItemLoader(item=ZhihuQuestionItem(),response=response)
item_loader.add_css('title','.QuestionHeader-title::text')
item_loader.add_css('content','.QuestionHeader-detail')
item_loader.add_value('url',response.url)
item_loader.add_value('question_id',question_id)
item_loader.add_css('answer_nums','.List-headerText span::text')
item_loader.add_css('comment_nums','.QuestionHeader-Comment button::text')
item_loader.add_css('watch_user_nums','.NumberBoard-itemValue::text')
item_loader.add_css('topics','.QuestionHeader-topics .Popover div::text')
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id,20,0),headers=self.headers,callback=self.parse_answer)
yield question_item
def parse_answer(self,response):
answer_json = json.loads(response.text)
is_end = answer_json['paging']['is_end']
totals = answer_json['paging']['totals']
# 提取 answer 的数据 item
for answer in answer_json['data']:
answer_item = ZhihuAnswerItem()
answer_item['zhihu_id'] = answer['id']
answer_item['url'] = answer['url']
answer_item['question_id'] = answer['question']['id']
answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else None
answer_item['content'] = answer['content'] if 'content' in answer else None
answer_item['create_time'] = answer['created_time']
answer_item['update_time'] = answer['updated_time']
answer_item['crawl_time'] = datetime.datetime.now()
yield answer_item
if not is_end:
# 判断是否还有下一页
next_url = answer_json['paging']['next']
yield scrapy.Request(next_url,headers=self.headers,callback=self.parse_answer)
当然可以维护多个用户,实现一随机的cookie池
4 Selenium 解决动态加载的html问题
Selenium:网站 开发测试框架
可以使 我们访问 的html 与 浏览器(f12) 得到的 html 一样(动态html)
1 selenium 需要 对应的 浏览器 driver
2 注意 下载 好对应的 driver
直接可以 放进 python 的scripts里面;
不用再去配置 executable_path=.../..exe参数
3 注意版本对应要求
如何把selenium集成scrapy中间件中
1 使用 中间件,需要 使用 HtmlResponse 直接把 response 返回给spider
from scrapy.http import HtmlResponse
class SeleniumMiddleware(object):
"""
通过 selenium 浏览器 驱动 直接获取网页 内容
不用再 经过 downloader 下载
所以 需要直接返回
"""
def process_request(self,request,spider):
if spider == 'lagou':
browser = webdriver.Chrome()
browser.get(request.url)
import time
time.sleep(1)
return HtmlResponse(url=browser.current_url,body=browser.page_source,request=request,encoding='utf-8')
2 保证 每个 爬虫 spider 只用 一个chrome
# 在开始的时候
class MySpider(scrapy.Spider):
...
...
def __init__(self):
super(LagouSpider,self).__init__()
self.browser = webdriver.Chrome()
3 保证 爬虫 spider 结束的时候 chrome 关闭 ==== 信号量
from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
class MySpider(scrapy.Spider):
...
...
def __init__(self):
super(LagouSpider,self).__init__()
self.browser = webdriver.Chrome()
dispatcher.connect(self.spider_closed,signals.spider_closed)
def spider_closed(self):
self.browser.quit()
print('browser关闭')
注意: 用 chrome 浏览器 去获取 url网页内容,返回 response的 过程是一个 同步的过程,会降低的运行速度
5 验证码
识别验证码的方法:
1 编码实现 (tesseract-ocr) === google--开源的识别软件
基于大量的人工训练数据,识别率,效率低
2 在线打码 (常用) 打码平台会提供 api 接口
3 人工打码 成本高
在线打码平台的 api
附: http 请求状态
200 -- 请求被成功处理 (一般去爬虫)
301/302 -- 重定向/ 永久性,临时性
403 -- 没有权限访问 (_xsrf,防止csrf攻击)
404 -- 访问资源有误
500 -- 服务器有误
503 -- 服务器维护,停机