你好,欢迎光临我的博客!

我是一名使用Python的程序员,这是我的github。平时喜欢玩各种电子游戏,这是我的psn:CenterRight这个博客用来分享我的生活、心得以及有关计算机的技术内容。除特殊说明外均采用CC0创作协议(任何人可以自由地复制,修改,分发和演出——甚至用于商业目的,而不必署名)但是转载建议保留原文链接以让读者看到最新的版本。本站所有内容仅代表本人观点,与我的雇主无关,并且会永远保持独立性不受任何组织和公司的影响。如果我们有共同兴趣,或者你遇到了麻烦,可以通过右边的电子邮箱联系我。也欢迎留言和订阅!

HTTP连接池(基于Python的requests和urllib3)

HTTP是建立在TCP上面的,一次HTTP请求要经历TCP三次握手阶段,然后发送请求,得到相应,最后TCP断开连接。如果我们要发出多个HTTP请求,每次都这么搞,那每次要握手、请求、断开,就太浪费了,如果是HTTPS请求,就更加浪费了,每次HTTPS请求之前的连接多好几个包(不包括ACK的话会多4个)。所以如果我们在TCP或HTTP连接建立之后,可以传输、传输、传输,就能省很多资源。于是就有了“HTTP(S)连接池”的概念。和线程池非常像是不是。本文介绍连接池,连接池管理器,主要基于Python和 requests, urllib3 两个库。主要讲HTTP连接池,HTTPS连接池原理一样,只不过不光缓存TCP连接,还有发起请求之前对证书认证等过程。

HTTP连接池 urllib3.HTTPConnectionPool

首先需要明确的是,HTTP连接池缓存的是TCP连接,这个链接是相对于客户端和服务器的,说简单点,就是针对一个url(ip)目标的,所以连接池建立的时候要指定对哪一个主机缓存连接。比如发送给domain.com/a的请求和发送给domain.com/b的请求是可以使用一个TCP连接的,但是发送给domain.com/a的请求和domain.com/b的请求就不可能用一个连接完成的。

尝试使用一下:

这里我们用一个连接池发送了5次请求,运行结果如下:

同时,用Wireshark抓包,用 ip.src==47.95.47.253 or ip.dst==47.95.47.253 and (tcp.flags==0x12)过滤出来TCP握手的包,可以看到只抓到1个。证明我们5次请求只建立了一个TCP连接。

有个需要注意的参数是maxsize,这个参数指定了缓存连接的数量,默认是1.如果在多线程的情况下,可能两个线程用到了同一个pool,只有一个连接被缓存的话,另一个线程就需要新开一个连接。这时候会有两种情况:

  1. 如果block参数是True,那么第二个线程被阻塞,直到这唯一一个可用的连接被释放。
  2. 如果blcok参数是False(默认),那么第二个线程会新建一个连接,但是使用完成之后连接被销毁。连接池只会保存一个连接。

测试一下第一种情况,线程1和2同时发送请求,结束之后新的两个线程又发送请求。通过输出结果和Wireshark抓包发现自始至终只有1个TCP连接,没有新的建立。

输出结果,连接数始终是1:

Wireshark抓包,只有1次连接:

再试一下第二种情况,下面的代码和上面的唯一的区别是block参数是False

输出结果:

Wireshark抓包,前两个线程会创建两个连接,一个连接使用之后被缓存,另一个使用之后就断开。在后面线程3和4的时候,一个线程会使用缓存的连接,另一个又会新开一个连接。所以一共有三次握手的包。

综上,在多线程的环境中,多缓存一些连接可能带来性能上的提升,一般连接数等于线程数,这样保证所有的线程都有缓存的连接可用。当然,也要结合实际的情况考虑timeout 和 block等参数。

连接池管理器 urllib3.PoolManager

上面介绍的连接池是面向对方主机管理的,如果我要向不同的域名发请求,希望缓存多个域名的连接,就要有多个连接池。好在urllib3将这一层也抽象了。

PoolManager做的事情并不多,基本上就是一个MRU原则(Least Recently Used )维护自己的Pool。比如初始化的大小设置为10,那么需要建立第11个连接池的时候,最最旧的一个连接池就被销毁。

它的函数原型是class urllib3.poolmanager.PoolManager(num_pools=10, **connection_pool_kw),只有一个参数num_pools表示池的数量,其余参数将会传给Pool初始化。

requests中的接口

HTTP请求相当dirty,好在优秀的库requests帮我们搞定了各种复杂的情况。建议涉及HTTP操作的都是用requests这个封装好的库。

requests中有Adapter的概念,事实上,所有的请求都是通过默认的一个HTTPAdapter发出去的。如果我们想给一个域名加代理,都可以amount一个自定义的Adapter。

参数很明确,pool_connections会传到HTTPConnectionPool控制缓存连接的数量,pool_maxsize会传到PoolManager控制Pool的数量。

关于“连接池”和“连接池管理器”我有一个很困惑的地方, 为什么要分开这两个概念呢?这样的话要控制连接池连接的数量和连接池的数量,就要权衡我的应用是都连接向同一个网站的,还是连接向不同的网站的。然后根据线程权衡设置这两个数量。如果只有一个概念,连接池里面可以有各种域名的连接的缓存,我就可以直接考虑线程的数量来设置缓存连接的数量了。反正同一连接池的两个连接是两个连接,两个连接池的连接也是两个连接。如果去掉连接池管理器,直接将概念压扁成一层,那么对连接数量的管理就更方便了不是吗?可能urllib这么做也有它的原因吧,如果读者知道其中的原因或者我的想法的漏洞可以指点一下。

 

TCP可以使用两次握手建立连接吗?

人们经常说TCP建立连接需要三次握手,断开连接需要四次挥手。有次我抓包发现,断开连接只抓到了三个包。

上图中可以看到,20号是一方发送的TCP包,FIN 表示“我这边没有数据要传输了”,然后经过20、21、22三个包,双方断开连接。回顾一下教科书上的断开连接过程:

于是可以看到,上图中抓的包实际上是将ACK和FIN组合成一个包发送了,所以三个包就可以断开连接。维基百科的资料表示三个包断开连接也是可以的。

也可以通过测三次握手关闭连接。主机A发出FIN,主机B回复FIN & ACK,然后主机A回复ACK。

TCP要求,每一个发出去的包必须收到确认(ACK)。但实际上,并不会对每一个包发回一个单独的ACK,因为ACK和ACK标志位和数据段在不同的位置,所以数据和ACK是可以一同发的。Stack Overflow有一个很好看的ASCII图,我也贴一下。

TCP要求包收到ACK确认:

但实际上:

但实际ACK的情况还要复杂……我就不跑题了。

于是我就有了一个疑问,就是能否将第三次ACK的传递省略,等下一次有数据传送的时候再带上ACK传过去,不可以吗?

首先回顾一下TCP握手的原理,建立连接的目的是:1.表达一方企图建立连接的意图(SYN)2.表达知道对方的意图(ACK)。其实就是A发送SYN表示自己想要建立连接,B发送ACK表示知道了。然后B发送SYN表示想建立连接,A发送ACK表示自己知道了。向我们上面断开连接的三个包一样,B可以同时发送SYN+ACK,这样就是我们现在看到的三个包。

那么如果不发送第三个包,等下一次传输数据的时候带上ACK呢?

网上的解释基本都是这样的:

“已失效的连接请求报文段”的产生在这样一种情况下:client发出的第一个连接请求报文段并没有丢失,而是在某个网络结点长时间的滞留了,以致延误到连接释放以后的某个时间才到达server。本来这是一个早已失效的报文段。但server收到此失效的连接请求报文段后,就误认为是client再次发出的一个新的连接请求。于是就向client发出确认报文段,同意建立连接。假设不采用“三次握手”,那么只要server发出确认,新的连接就建立了。由于现在client并没有发出建立连接的请求,因此不会理睬server的确认,也不会向server发送数据。但server却以为新的运输连接已经建立,并一直等待client发来数据。这样,server的很多资源就白白浪费掉了。采用“三次握手”的办法可以防止上述现象发生。例如刚才那种情况,client不会向server的确认发出确认。server由于收不到确认,就知道client并没有要求建立连接。”

但并没有彻底解决我的疑问。比如A和B想建立连接,上述的解释只是意图保护B的资源,如果我是A,那么我这边是的实现是否可以做成不发送ACK而是带数据一同发送过去?

这里我们可以考虑一下如果有第三次包,但是第三次包ACK丢失的情况:

A发完ACK,单方面认为TCP为 Established状态,而B显然认为TCP为Active状态:

  1. 假定此时双方都没有数据发送,B会周期性超时重传,直到收到A的确认,收到之后B的TCP 连接也为 Established状态,双向可以发包。
  2. 假定此时A有数据发送,B收到A的 Data + ACK,自然会切换为established 状态,并接受A的 Data。
  3. 假定B有数据发送,数据发送不了,会一直周期性超时重传SYN + ACK,直到收到A的确认才可以发送数据。

(摘自知乎

所以从这里看来,如果发起连接放有意不发送ACK,而是等下一次带上数据发送,也是能够成功建立连接的。我自己认为这里面没有逻辑问题,在网上搜索了一些资料,想得到确认。

果然,有人在wireshark论坛贴了一个问题,说自己抓到的包在TCP三次握手中,第三次ACK带着数据。有人回复说:如果在收到对方的第二次包SYN+ACK之后很快要发送数据,那么第三次包ACK可以带着数据一起发回去。这在Windows和Linux中都是比较流行的一种实现。但是数据的大小在不同实现中有区别。

而在TCP fast open中,是明确说最后的ACK是可以带有数据的:

更让我惊讶的是,在原先的RFC 793描述的TCP连接建立过程中,甚至第一个包SYN也是可以带有数据的!这篇博客中有描述。但如果这样做的话,直到连接建立完成,第一次包带的数据都不能释放。所以我觉得现在的实现都没有这样做的原因是,包是节省了一些,但维护连接建立的成本更大了,更容易被SYN洪水攻击。

所以综上,TCP能不能通过两个包建立连接?不能。但是第三次ACK可以带有数据。但是ACK是必须发的,必须让对方知道自己的连接建立意图。如果收到对方第二个包SYN+ACK马上有数据要发送,那么就可以发送第三次ACK+数据;如果没有数据要发送,那么要给对方回复一个ACK,完成连接的建立。

 

有关爬虫框架的设计的一些备忘

我们公司一直在使用自己设计的爬虫框架,这么做的初衷是让框架保持简单,新手可以很快写一个爬虫工作。但是也遇到过许许多多的问题。这里记一下,如果要重新写一个爬虫框架的话,这些问题必须好好处理,可以少走很多弯路。

1.任务队列

爬虫肯定是要用到任务队列的。将每个页面的抓取分成任务,可以让任务之间互不影响,方便监控,也方便调度。任务队列是个很大的话题,我们用的是beanstalkd,它轻量,速度快,但是也有一些缺点。我对任务队列没有什么研究,这里就不比较各个方案了。但是爬虫需要的任务队列要求如下:

  1. 必须能够快速切换。一个任务就是一个网络请求+解析结果。这样划分比较合理,可以看到对某个页面的抓取状态。这就要求任务队列的吞吐量很高,要频繁的放、取任务。我们每天的抓取任务在千万级别。
  2. 支持分组(tube)。这样就可以对不同的爬虫任务循环执行。单队列可能造成对抓去网站DoS攻击,很不友好。
  3. 支持优先级。
  4. 支持重试,延迟重试。但不是必须,这个可以在爬虫队列实现。

2.数据的去重

如果没有考虑到数据去重,那么每一次执行爬虫任务都会带来新数据,这肯定是不合理的,所以爬虫每次运行都要考虑到用某种方式判定是否是一条已经存在的数据。

比较通用的方法是通过hash来判定重复。我们就是用的这种方案:对每一条数据挑一些字段(全文hash很危险,经常造成数据重复。比如有一个字段变化了,但其实应该作为一条数据处理)来计算hash值,存入数据库。在数据库中插入的时候根据hash值是否存在。

这种方案存在几个问题:

  1. hash碰撞。如果手动实现处理hash碰撞的情况,又是一个复杂的过程。目前我们是没有考虑这个的,因为我们的每一个目标网站数据量不是很多,对每一个目标网站使用一个collection存放,就大胆地假设了数据不会存在hash碰撞的情况。但是这可能是一个隐患。可以看一下这个项目,很有意思,可以对两个完全不同的pdf修改让它们的sha1结果一模一样但是不改变显示内容。
  2. 对数据库的压力。爬虫集群如果规模很大,那么意味着每一次爬虫任务都会去查询数据库,我们的Mongo集群经历过多次压力过大的问题。

其实我觉得,比较合理的方式是建立一种“爬虫任务–页面–数据”之间的关系。根据这种关系来去重,比如如果在抓取页面的时候就能判断过这个页面已经被处理过。那么就可以认为页面上的数据已经存在,可以节省很多操作。

这就需要下面下载与处理的分离。

3.下载与处理分离

这里想说的是,将下载和处理页面解耦。这样做的好处是你可以部署一个网络比较好或者有特殊要求的(比如国外ip,国内ip)集群来专门下载网页,然后用一些配置高的机器用来处理网页,这可能会节省一些vps租金。因为众所周知,爬虫比较耗费的资源是网络请求和ip。这样做的缺点是会增加框架的复杂程度。

此外,还有一个好处是上面提到的,下载好的页面可以做简单的判断,如果是已经处理过的页面并且没有发生更新,那么就直接结束这个任务。如果是个新的页面,就要交给后面的某个处理程序继续处理,像一个管道一样。

而且,还可以对下载步骤(或处理步骤)做一些Hook,如果我想保存下载所有的下载过的页面,就可以在下载完成的时候,将页面同时交给一个线程来上传到S3 bucket,同时交给一个线程做抽取任务。

这种解耦对监控来说也应该方便很多。

4.监控

需要监控的地方:

  1. 任务队列中的状态
  2. 每个任务的执行情况,比如异常等
  3. 数据的增长
  4. 追踪每个任务执行带来的数据

最好监控到每一次任务执行的参数,比如从任务队列中获得的任务信息,执行时间等。以及任务失败时候的异常,这样发现爬虫挂掉就很方便。另外需要特别注意的是,任务没有异常抛出,但实际上一次任务没有带来数据。就好比程序实际是错的,但是实际跑一次并不会挂掉。这种情况很难避免,即使你处理异常的时候非常小心,也有可能捕获了不想捕获的异常,唯有从监控上下功夫。最好是将任务执行和数据存储关联起来。在存数据的时候,记录这条数据是哪一个任务带来的。如果一次任务没有带来数据,也应该有警告。当然如果目标就是抓取一个列表页面,提取url,没有数据是正常的,这种情况应该可以被过滤掉。总之重点是,一个任务应该和进来的数据有关联,可以查看一个任务带来多少数据,亦可以查看某条数据是由哪个任务带来的。

第1条和第3条可以使用Grafana等组合Metric来实现。

目前来看,ElasticSearch用来做日志分析挺不错的。其他没有使用过。

暂时就想到这个,最近经常会有一个动手写个框架的冲动。以上只是个人的经验和想法,并不一定是合适的方案,欢迎讨论。

 

Vim:快速复制和替换

用Vim写代码经常遇到的一个操作是:复制一个单词(或其他text object),然后到下一个(多个)地方用复制的单词替换某个单词。我以前在这个简单的操作上浪费了很多时间……因为yyank之后,用d+p来粘贴,会粘贴出来d删除的内容……好吧,今天学到两种更快速的方法,记一下,以及原理。

第1种:选中文本并替换

操作步骤如下:

  1. 使用yiwyank(复制的意思)要复制的文本
  2. 移动到要替换的文本
  3. viwp使用yank的文本替换选中的文本
  4. 移动到下一个需要替换的文本
  5. viw"0p使用步骤1yank的文本再次替换

首先要了解的是Vim中的寄存器概念。Vim有很多寄存器,顾名思义,其实和CPU的寄存器差不多意思,你可以将内容临时存放在寄存器里面。在命令行模式下输入:reg可以看到寄存器的名字以及目前保存的内容。例如:使用"ayy将会把当前行yank(复制)到a寄存器。"ap可以粘贴出a寄存器的内容。

"寄存器是默认使用的寄存器。所以说,无论是用y命令复制,还是用d删除,还是我们使用viw”0p命令覆盖选中内容,内容都会保存到”寄存器。p粘贴默认也是粘贴的”寄存器的内容。1-9寄存器保存了删除的行,使用dd删除行就会保存到1寄存器,然后再删除一行,1寄存器就会被更新,之前1寄存器的内容就会移动到2,以此类推。0表示最近yank或删除的,也就是说,无论是y命令还是d命令都会将“最近”的文本保存到0寄存器。这就是第5步要使用"0p的原因,因为"寄存器的内容在第3部已经被覆盖了,所以我们要使用“最近yank的内容”就要指定0寄存器。关于所有寄存器的种类和解释,可以参考这篇文章,解释的很好。也可以看Vim的文档: :help reg

另外一个细节是使用了yiw来yank一个单词。这是用了Vim的文本对象,这样光标无论在单词首,还是在单词里面,都可以快速复制整个单词,而不用选中再复制。文本对象是Vim很强大的一个地方,这篇博客对文本对象介绍的很好

第2种:使用.重复操作

步骤如下:

  1. 使用yiwyank要复制的单词
  2. 移动到要替换的位置
  3. ciw CTRL-R 0 ESC 进入编辑模式,使用CTRL-R快捷键在插入模式中粘贴出寄存器0的内容并退回到Normal模式
  4. 移动到下一个要替换的位置
  5. 使用.重复步骤3的操作

这个用到了CTRL-R,这个组合键在Normal模式下是Redo的意思(重做,和Undo是对立的,说白点就是和CTRL-Z对立),但是在插入模式(其实在命令模式同样有效)下就变成了强大的“粘贴寄存器内容到当前光标处”的意思。这里有个不错的、简短的解释

另外使用了.进行重复操作。这个点表示“重复上次在Normal模式下的操作”,也就是步骤3的全部。

最后,如果对整行进行替换的话,可以对方法1进行改良,使用V来选中整行。

替换整行的步骤如下:

  1. yyyank要复制的一行
  2. 移动到要替换的行
  3. 使用Vp选中当前行并进行替换
  4. 移动到下一个要替换的行
  5. 使用V"0p替换一行。
 

pytest插件开发笔记

上一篇博客解决了写爬虫测试时候的一个痛点:复制粘贴太多的重复代码。可还有一个比较烦人的地方,即使test的代码可以自动生成,但还是需要人工去找到底什么地方出现了外部请求,如果 检查外部请求->下载对应的资源并生成mock的代码这部分能够自动完成就好了。今天下手搞了下。

我们的CI是通过pytest的conftest强制必须mock掉HTTP请求的。所以我想这个也可以通过pytest的插件机制来解决。于是就写了一个pytest插件来搞。

Github地址:https://github.com/laixintao/pytest-mock-helper

Pypi: https://pypi.org/project/pytest-mock-helper/

写这个插件花了大约两个小时,原理非常简单。在pytest启动的时候将requests库的requests.adapters.Adapter.send方法替换掉,加入我们要做的逻辑。这么做的灵感来自于另外一个pytest插件:pytest-blockage。这个插件的作用是让pytest强制block掉HTTP请求。和我们在conftest中做的事情一样,但是插件更合理一些,因为我们在不同的项目中就要复制conftest中的代码。不过要注意的是这个插件很久不更新了,不支持Python3。所以如果有类似的需求(在CI中强制Mock HTTP请求)建议使用我的插件。

能如此快速写出一个插件要得益于pytest超赞的文档啊。writing_plugins这一页写的很详细。有一些需要注意的地方,在这篇博客中我记一下:

插件载入的顺序:

  1. 内置的_pytest文件夹中的插件
  2. 外部安装的插件。通过搜索setuptools提供的entry_points查找pytest11,这个很有意思的,不知道为什么会是pytest11,看起来像是开发者随手写的。如果我们自己写插件,就要在setup.py中提供一个pytest11的entry_points
  3. 载入conftest.py

快速参考一些插件:

  1. 文档提供了一个非常简单的Yaml文件测试的插件代码,源代码一看就能懂
  2. 默认插件的源代码
  3. 这里有一个搭建在Heroku上面的pytest兼容性列表,列出了第三方写的插件。这个我猜是监控的pypi,因为我自己写的插件昨天上传到了pypi,今天就在这个列表看见了。不过它判断是否兼容应该是判断的setup.py中的classifiers。我啥也没写就被无情地判断成了不兼容Pyhton27也不兼容Python3。
  4. 一个工具cookiecutter-pytest-plugin,用来生成一个插件的模板。(我不是很推荐这种东西,刚开始用个最小的能跑起来的东西就可以了)

另外发现往pypi.python.org上传东西已经过时了,Api会返回Upload failed (410) Gone。现在正处于迁移的过程,新的包应该向pypi.org提交。而且应该用twine上传。

新的pypi酷炫多了,Python社区在迭代方面真是勇敢啊。都是好事儿。