爬虫与反爬


爬虫与反爬


(1) 基本的概念


爬虫:

自动获取网站数据的程序

  关键是 定时,定量的,批量的获取

反爬虫:

使用技术手段 防止爬虫程序的方法

存在误伤,即 反爬技术 将普通用户识别为爬虫

如果误伤高 --- 效果再好也不能使用

例子:

	比如 限制 ip  === 用户的ip 一般都是 局域网内动态分配的,
    一个爬虫的ip 可能分配给 另一个 非爬虫的用户

    有效的方法: 可以在一段时间内 限制 ip,过一段时间 再把 ip释放

反爬的成本:

成功率越高成本越大,拦截率越高,误伤率越高

反爬虫的目的:

初级爬虫:简单粗暴,不管服务器压力,容易使服务器奔溃

数据保护

失控的爬虫  某种情况下,无法或者没有关闭的爬虫

商业对手

爬虫 vs 反爬虫的对抗

网站反爬的的策略 vs 爬虫策略

   1.监控 某个时间 访问突然增加,ip相同,user-agent不是浏览器;限制 ip访问 (注意:不能封ip)

     (1)user-agent模拟,ip代理(ip代理池)

   2 发现 ip 变化, 要求登录访问

      (2) 注册 账号, 每次请求 带 cookie

   3  开发健全的账号体系,每个账号 权限不同

       (3) 多个账号 联合爬虫(维护账号池)

   4 访问频繁 ,限制 ip频率

       (4) 模仿人的请求速度

   5  弹出 验证码

        (5) 识别验证码

    6  增加网页 内容的动态填充 ajax向后台请求

     	(6) selenium + phantomJs 完全模拟浏览器

爬虫技术使得网站反爬的成本会越来越高


(2)在scrapy中用到的反爬策略


1 随机切换 UA

1 可以在 settings 中维护一个 ua_list

 然后每个 Request 里的headers 参数 随机获取;

 缺点: 代码冗长,每个spider都要写

2 使用 downloadermiddleware 全局middleware,写一个 UA middleware

注意:scrapy有一个的默认UserAgentMiddlewarescrapy.downloadermiddlewares.
useragent.UserAgentMiddleware  会从setings 配置文件中获取

3 使用第三方的fake-useragent 包 FakeUseragent().ie / .random

实际是维护了一个 ua的网页 :https://fake-
useragent.herokuapp.com/browsers/0.1.5

4 配置 USER_AGENT_TYPE = ‘random’ + 利用 fake-useragent 来实现 随机ua

5 这样 在 scheduler 把 request 通过 engine 发给 下载器的时候 就会加上 RandomUA

代码实现如下:

settings: 

	USER_AGENT_TYPE = 'random'
	
	DOWNLOADER_MIDDLEWARES = {
		'项目.middlewares.RandomUserAgentMiddleware': 543,
	}


middlewares.py:

	class RandomUserAgentMiddleware(object):
	    def __init__(self,crawler):
	        super(RandomUserAgentMiddleware,self).__init__()
	        self.ua = UserAgent()
	        self.ua_type = crawler.settings.get('USER_AGENT_TYPE','random')
	
	    @classmethod
	    def from_crawler(cls,crawler):
	        return cls(crawler)
	
	    def process_request(self,request,spider):
	        def get_ua():
	            return getattr(self.ua,self.ua_type)
	        print(get_ua())
	        request.headers.setdefault('USER_AGENT',get_ua())


2 设置ip 代理

(1)ip 不是固定不变的,存在一定的误伤:

云服务器:

    阿里云不会变化
    亚马逊服务器 重启之后ip会变动

小区的ip 也是动态分配

注意: 本机ip 爬取速度最好,最稳定,要尽量避免 本机ip被封掉

(2)ip 代理的 原理:

本地 向 代理服务器 发起请求,代理服务器 与 要请求的服务器进行 交互 

经过代理之后的速度: 经过一次 中间服务器 速度会慢很多

(3)如何设置ip 代理

request.meta['proxy'] = 'http://ip + port'

(4)如何设置 ip代理池:

自己写一个 处理 ip的 脚本,随机获取

思路:

    爬虫爬取 免费 ip;

    放到数据库;

    再 作测试,删除不能用的ip;

    获取ip;
		select ip,port from ip_proxy order by rand() limit 1;
	    随机返回 ip


代码实现: 1 爬取快代理的ip,并存入数据库:

	
import requests
from scrapy.selector import Selector
import MySQLdb
from concurrent.futures import ThreadPoolExecutor

# pool = ThreadPoolExecutor(max_workers=6)

# proxy = {
#     'https':'https://113.139.180.244:808' }

headers = {
    'Host':'Host:www.kuaidaili.com',
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
    }
    
conn = MySQLdb.connect(db='scrapytest', host='localhost', port=3306, user='root', passwd='', charset='utf8')
cursor = conn.cursor()

response = requests.get('https://www.kuaidaili.com/free/inha/1/',headers=headers)
selector = Selector(text=response.text)
# 获取 全部页码
pages = int(selector.xpath("//div[@id='listnav']/ul/li")[-2].xpath('a/text()').extract_first())
# 获取所有 url
page_list = ['https://www.kuaidaili.com/free/inha/'+ str(num) for num in range(1,pages+1)]

def get_text(page):
    response = requests.get(page,headers=headers)
    text = response.text
    return text

def get_ips(text):
    selector = Selector(text=text)
    trs = selector.xpath("//tbody/tr")
    for tr in trs:
        ip = tr.xpath("td[1]/text()").extract_first()
        port = tr.xpath("td[2]/text()").extract_first()
        addr = tr.xpath("td[5]/text()").extract_first()
        speed = tr.xpath("td[6]/text()").extract_first().split('秒')[0]
        param = (ip,port,addr,speed)
        filter_set.add(param)
        return filter_set

def insert_mysql(param):
        sql = '''
        INSERT INTO ip_proxy(ip,port,addr,speed) VALUES(%s,%s,%s,%s)
        '''
        cursor.execute(sql,param)
        conn.commit()

class GetRandomIP(object):

    def judge_ip(self,ip,port):
        url = 'http://www.baidu.com'
        proxy_url = 'http://{0}:{1}'.format(ip,port)
        try:
            proxy_dict = {
                'http':proxy_url
            }
            response = requests.get(url,proxies=proxy_dict)
        except Exception as e:
            self.delete_ip(ip)
            return False

        else:
            code = response.status_code
            if code >=200 and code <300:
                return True
            else:
                self.delete_ip(ip)
                return False

    def delete_ip(self,ip):
        sql = '''
        DELETE FROM ip_proxy WHERE ip=%s
        '''
        cursor.execute(sql,(ip,))
        conn.commit()

    def get_random_ip(self):
        sql = '''
        SELECT ip,port FROM ip_proxy ORDER BY RAND() LIMIT 1
        '''

        cursor.execute(sql)
        ip,port = cursor.fetchone()
        if self.judge_ip(ip,port):
            return '{0}:{1}'.format(ip,port)
        else:
            return self.get_random_ip()

# if __name__ == '__main__':
#     filter_set= set()
#     for page in page_list:
#         text = get_text(page)
#         get_ips(text)
#         for param in filter_set:
#             insert_mysql(param)


代码实现: 2 设置ip代理的中间件

settings
	
	DOWNLOADER_MIDDLEWARES = {
	'LG.middlewares.RandomIPMiddleware': 300,}
	
middlewraes

	class RandomIPMiddleware(object):
	    def process_request(self,request,spider):
	        ip_port = GetRandomIP().get_random_ip()
	        request.meta['proxy'] = 'http://{0}'.format(ip_port)


3 模拟登陆 cookie与session

注意浏览器是 无状态请求

cookie(浏览器本地存储机制) -- 有状态的请求(本地存储,本地文件中)

cookie原理

用户 ---->>>  服务器
浏览器
    <<-----  分配 id

    ----->> 再次请求(附加身份信息)

不能存储 客户的敏感信息 --- 本地文件容易丢失,不安全

session(基于cookie实现)

返给 客户端 一个session_id
服务器端生成(具有过期时间)
客户端请求的时候,带着session_id 在 服务器中获取 个人信息

模拟登陆的思路:

请求方式 :post
请求url:
请求参数:_xsrf
        username
        pwd

获取 session


代码实现:1 requests + cookie

	
import requests
try:
    import cookielib   # py2
except:
    import http.cookiejar as cookielib # py3

# py2 和  py3 的 兼容代码

import re

def get_xsrf():
    response = requests.get('https://www.zhihu.com',headers=headers)
    text = response.text
    xsrf = re.search(r';xsrf&quot;:&quot;(.*?)&quot',text)
    if xsrf:
        return xsrf.group(1)
    else:
        return ''

headers = {
    'HOST':'www.zhihu.com',
    'Referer':'https://www.zhihu.com',
    'User-Agent':'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',}

cookies_ = {	        
            "_xsrf":get_xsrf(),
            "z_c0":".........."}

response = requests.get(url='https://www.zhihu.com/inbox',
                        headers=headers,
                        cookies = cookies_,
                        allow_redirects = False)

print(response.status_code)


当然也可以通过session来实现
	
	session = requests.session()	
	
	def zhihu_login(account,pwd):
	    # 知乎登录
	    post_url = 'https://www.zhihu.com/api/v3/oauth/sign_in'
	    post_data = {
	        '_xsrf':get_xsrf(),
	        'username':account,
	        'password':pwd
	    }
	    response = session.post(post_url,data=post_data,headers=headers)
	    session.cookies.save()
	
	zhihu_login('xxx','xxxx')
	
	
	def get_logined_index():
	    response = session.get('https://www.zhihu.com',headers=headers)
	    with open('index.html','wb') as f:
	        f.write(response.text.encode('utf-8'))
	
	get_logined_index()


代码实现:2 scrapy

爬取知乎的问答

import scrapy
import re,datetime
import json
from urllib.parse import urljoin
#  拼接域名 url

from scrapy.loader import ItemLoader
from zhihuspider.items import ZhihuQuestionItem,ZhihuAnswerItem
	
class ZhihuSpider(scrapy.Spider):
    name = 'zhihu'
    allowed_domains = ['zhihu.com']
    start_urls = ['http://zhihu.com/']

    headers = {
        'HOST': 'www.zhihu.com',
        'Referer': 'https://www.zhihu.com',
        'User-Agent': 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',
    }
    cookies_ = {
        "_xsrf": '',
        "z_c0": "............",
    }
    # api 接口
    start_answer_url = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}'
    # 知乎 question 的 answer 的起始 url


    def start_requests(self):
        return [scrapy.Request('https://www.zhihu.com',callback=self.get_xsrf,headers=self.headers)]  # 获取xsrf

    def get_xsrf(self,response):
        response_text = response.text
        xsrf = re.search(r';xsrf&quot;:&quot;(.*?)&quot', response_text,re.DOTALL)
        if xsrf:
            xsrf= xsrf.group(1)
        else:
            xsrf = ''
        self.cookies_['_xsrf'] = xsrf
        for url in self.start_urls:
            yield scrapy.Request(url, headers=self.headers,cookies=self.cookies_, dont_filter=True,callback=self.parse)


    def parse(self,response):
        '''
        提取所有  url  === 深度优先的策略
        其中  question/数字 的url
        :param response:
        :return:
        '''
        all_urls = response.css("a::attr(href)").extract()
        all_urls = [urljoin(response.url,url) for url in all_urls]
        all_urls = list(filter(lambda url:True if url.startswith('https') else False,all_urls ))

        for url in all_urls:
            match_obj = re.match('(.*zhihu.com/question/(\d+))(/.*|$)',url)
            if match_obj:
                request_url = match_obj.group(1)
                question_id = match_obj.group(2)
                yield scrapy.Request(request_url,meta={'question_id':question_id},headers=self.headers,callback=self.parse_question)
            else:
                yield scrapy.Request(url,headers=self.headers,callback=self.parse)

    def parse_question(self,response):
        """
        获取 详细的 item
        :param response:
        :return:
        """
        question_id = response.meta.get('question_id')
        item_loader = ItemLoader(item=ZhihuQuestionItem(),response=response)
        item_loader.add_css('title','.QuestionHeader-title::text')
        item_loader.add_css('content','.QuestionHeader-detail')
        item_loader.add_value('url',response.url)
        item_loader.add_value('question_id',question_id)
        item_loader.add_css('answer_nums','.List-headerText span::text')
        item_loader.add_css('comment_nums','.QuestionHeader-Comment button::text')
        item_loader.add_css('watch_user_nums','.NumberBoard-itemValue::text')
        item_loader.add_css('topics','.QuestionHeader-topics .Popover div::text')

        question_item = item_loader.load_item()
        yield scrapy.Request(self.start_answer_url.format(question_id,20,0),headers=self.headers,callback=self.parse_answer)

        yield question_item


    def parse_answer(self,response):
        answer_json = json.loads(response.text)
        is_end = answer_json['paging']['is_end']
        totals = answer_json['paging']['totals']	
        # 提取 answer 的数据 item
        for answer in answer_json['data']:
            answer_item = ZhihuAnswerItem()
            answer_item['zhihu_id'] = answer['id']
            answer_item['url'] = answer['url']
            answer_item['question_id'] = answer['question']['id']
            answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else None
            answer_item['content'] = answer['content'] if 'content' in answer else None
            answer_item['create_time'] = answer['created_time']
            answer_item['update_time'] = answer['updated_time']
            answer_item['crawl_time'] = datetime.datetime.now()
            yield answer_item

        if not is_end:  
			# 判断是否还有下一页
            next_url = answer_json['paging']['next']
            yield scrapy.Request(next_url,headers=self.headers,callback=self.parse_answer)

当然可以维护多个用户,实现一随机的cookie池


4 Selenium 解决动态加载的html问题


Selenium:网站 开发测试框架

可以使 我们访问 的html 与 浏览器(f12) 得到的 html 一样(动态html)

1  selenium 需要 对应的 浏览器 driver

2   注意 下载 好对应的 driver
	直接可以 放进 python 的scripts里面;
	不用再去配置 executable_path=.../..exe参数

3  注意版本对应要求


如何把selenium集成scrapy中间件中

1 使用 中间件,需要 使用 HtmlResponse 直接把 response 返回给spider

from scrapy.http import HtmlResponse
class SeleniumMiddleware(object):
    """
    通过 selenium 浏览器 驱动 直接获取网页 内容
    不用再 经过 downloader 下载
    所以 需要直接返回
    """
    def process_request(self,request,spider):
        if spider == 'lagou':
            browser = webdriver.Chrome()
            browser.get(request.url)
            import time
            time.sleep(1)

            return HtmlResponse(url=browser.current_url,body=browser.page_source,request=request,encoding='utf-8')

2 保证 每个 爬虫 spider 只用 一个chrome

# 在开始的时候
class MySpider(scrapy.Spider):
     ...
     ...
     def __init__(self):
            super(LagouSpider,self).__init__()
            self.browser = webdriver.Chrome()

3 保证 爬虫 spider 结束的时候 chrome 关闭 ==== 信号量

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals

class MySpider(scrapy.Spider):
 ...
 ...

    def __init__(self):
        super(LagouSpider,self).__init__()
        self.browser = webdriver.Chrome()

        dispatcher.connect(self.spider_closed,signals.spider_closed)

    def spider_closed(self):
        self.browser.quit()
        print('browser关闭')

注意: 用 chrome 浏览器 去获取 url网页内容,返回 response的 过程是一个 同步的过程,会降低的运行速度


5 验证码

识别验证码的方法:

1 编码实现 (tesseract-ocr) === google--开源的识别软件
基于大量的人工训练数据,识别率,效率低

2 在线打码 (常用) 打码平台会提供 api 接口

3 人工打码 成本高

在线打码平台的 api

自己改写了一小部分,云打码api

附: http 请求状态

200 -- 请求被成功处理    (一般去爬虫)
301/302  -- 重定向/ 永久性,临时性
403  -- 没有权限访问  (_xsrf,防止csrf攻击)
404  -- 访问资源有误
500  --  服务器有误
503  -- 服务器维护,停机
Buy me a 肥仔水!